fix #1952, document how to use StreamCQL over Gearpump

Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/75ad76cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/75ad76cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/75ad76cb

Branch: refs/heads/master
Commit: 75ad76cb386c9d95dcf907b141722aeb207f1a81
Parents: 89292ac
Author: manuzhang <[email protected]>
Authored: Thu Feb 4 11:39:02 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Thu Feb 4 12:10:54 2016 +0800

----------------------------------------------------------------------
 docs/dev-storm.md                               | 57 ++++++++++++++++++--
 .../experiments/storm/main/GearpumpNimbus.scala |  9 ++--
 .../storm/main/GearpumpStormClient.scala        |  3 +-
 3 files changed, 58 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/75ad76cb/docs/dev-storm.md
----------------------------------------------------------------------
diff --git a/docs/dev-storm.md b/docs/dev-storm.md
index a22ed6c..0ff1127 100644
--- a/docs/dev-storm.md
+++ b/docs/dev-storm.md
@@ -80,10 +80,10 @@ This section shows how to run an existing Storm jar in a 
local Gearpump cluster.
    bin/local
    ```
 
-2. start a local Gearpump Nimbus server 
+2. start a Gearpump Nimbus server 
 
-   Users need server's thrift port to submit topologies later. The thrift port 
is written to a yaml config file set with `-output` option. 
-   Users can provide an existing config file where only `nimbus.thrift.port` 
is overwritten. If not provided, a new file `app.yaml` is created with the 
config.
+   Users need server's address(`nimbus.host` and `nimbus.thrift.port`) to 
submit topologies later. The address is written to a yaml config file set with 
`-output` option. 
+   Users can provide an existing config file where only the address will be 
overwritten. If not provided, a new file `app.yaml` is created with the config.
 
    ```
    bin/storm nimbus -output [conf <custom yaml config>]
@@ -172,4 +172,53 @@ where
 
 * application config is submit from Storm application along with the topology 
 * component config is set in spout / bolt with `getComponentConfiguration`
-* custom file config is specified with the `-config` option when submitting 
Storm application from command line or uploaded from UI
\ No newline at end of file
+* custom file config is specified with the `-config` option when submitting 
Storm application from command line or uploaded from UI
+
+## StreamCQL Support
+
+[StreamCQL](https://github.com/HuaweiBigData/StreamCQL) is a Continuous Query 
Language on RealTime Computation System open sourced by Huawei.
+Since StreamCQL already supports Storm, it's straightforward to run StreamCQL 
over Gearpump.
+
+1. Install StreamCQL as in the official 
[README](https://github.com/HuaweiBigData/StreamCQL#install-streamcql)
+
+2. Launch Gearpump Nimbus Server as before 
+
+3. Go to the installed stream-cql-binary, and change following settings in 
`conf/streaming-site.xml` with the output Nimbus configs in Step 2.
+
+    ```xml
+    <property>
+      <name>streaming.storm.nimbus.host</name>
+      <value>${nimbus.host}</value>
+    </property>
+    <property>
+      <name>streaming.storm.nimbus.port</name>
+      <value>${nimbus.thrift.port}</value>
+    </property>
+    ```
+ 
+4. Open CQL client shell and execute a simple cql example
+   
+   ```
+   bin/cql
+   ```
+   
+   ```
+   Streaming> CREATE INPUT STREAM s
+       (id INT, name STRING, type INT)
+   SOURCE randomgen
+       PROPERTIES ( timeUnit = "SECONDS", period = "1",
+           eventNumPerperiod = "1", isSchedule = "true" );
+   
+   CREATE OUTPUT STREAM rs
+       (type INT, cc INT)
+   SINK consoleOutput;
+   
+   INSERT INTO STREAM rs SELECT type, COUNT(id) as cc
+       FROM s[RANGE 20 SECONDS BATCH]
+       WHERE id > 5 GROUP BY type;
+   
+   SUBMIT APPLICATION example;    
+   ```
+   
+5. Check the dashboard and you should see data flowing through a topology of 3 
components
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/75ad76cb/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
index 96c12ae..6d5b0aa 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -39,6 +39,7 @@ import io.gearpump.streaming.StreamApplication
 import io.gearpump.util.{AkkaApp, Constants, LogUtil}
 import org.apache.storm.shade.org.json.simple.JSONValue
 import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
+import org.apache.storm.shade.org.yaml.snakeyaml.constructor.SafeConstructor
 import org.slf4j.Logger
 
 import scala.collection.JavaConverters._
@@ -79,12 +80,8 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser {
     val outputConfig = Utils.findAndReadConfigFile(output, 
false).asInstanceOf[JMap[AnyRef, AnyRef]]
     updatedConfig.putAll(outputConfig)
     updatedConfig.putAll(thriftConfig)
-    // workaround that yaml dumps {nimbus.host: 127.0.0.1} instead of 
{nimbus.host: '127.0.0.1'}
-    // which can not be loaded
-    val nimbusHost = s"${Config.NIMBUS_HOST}: 
'${thriftConfig.get(Config.NIMBUS_HOST)}'"
-    updatedConfig.remove(Config.NIMBUS_HOST)
-    val yaml = new Yaml
-    val serialized = yaml.dumpAsMap(updatedConfig) ++ s"$nimbusHost\n"
+    val yaml = new Yaml(new SafeConstructor)
+    val serialized = yaml.dumpAsMap(updatedConfig)
     val writer = new FileWriter(new File(output))
     try {
       writer.write(serialized)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/75ad76cb/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
index b813ae0..643e926 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
@@ -50,7 +50,8 @@ object GearpumpStormClient extends AkkaApp with 
ArgumentsParser {
       s"-D${PREFER_IPV4}=true"
     )
 
-    val classPath = Array(s"${System.getProperty(GEARPUMP_HOME)}/lib/storm/*", 
jar)
+    val gearpumpHome = System.getProperty(GEARPUMP_HOME)
+    val classPath = Array(s"$gearpumpHome/lib/*", 
s"$gearpumpHome/lib/storm/*", jar)
     val process = Util.startProcess(stormOptions, classPath, topology, 
stormArgs)
 
     // wait till the process exit

Reply via email to