fix #1952, document how to use StreamCQL over Gearpump
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/75ad76cb Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/75ad76cb Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/75ad76cb Branch: refs/heads/master Commit: 75ad76cb386c9d95dcf907b141722aeb207f1a81 Parents: 89292ac Author: manuzhang <[email protected]> Authored: Thu Feb 4 11:39:02 2016 +0800 Committer: manuzhang <[email protected]> Committed: Thu Feb 4 12:10:54 2016 +0800 ---------------------------------------------------------------------- docs/dev-storm.md | 57 ++++++++++++++++++-- .../experiments/storm/main/GearpumpNimbus.scala | 9 ++-- .../storm/main/GearpumpStormClient.scala | 3 +- 3 files changed, 58 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/75ad76cb/docs/dev-storm.md ---------------------------------------------------------------------- diff --git a/docs/dev-storm.md b/docs/dev-storm.md index a22ed6c..0ff1127 100644 --- a/docs/dev-storm.md +++ b/docs/dev-storm.md @@ -80,10 +80,10 @@ This section shows how to run an existing Storm jar in a local Gearpump cluster. bin/local ``` -2. start a local Gearpump Nimbus server +2. start a Gearpump Nimbus server - Users need server's thrift port to submit topologies later. The thrift port is written to a yaml config file set with `-output` option. - Users can provide an existing config file where only `nimbus.thrift.port` is overwritten. If not provided, a new file `app.yaml` is created with the config. + Users need server's address(`nimbus.host` and `nimbus.thrift.port`) to submit topologies later. The address is written to a yaml config file set with `-output` option. + Users can provide an existing config file where only the address will be overwritten. If not provided, a new file `app.yaml` is created with the config. ``` bin/storm nimbus -output [conf <custom yaml config>] @@ -172,4 +172,53 @@ where * application config is submit from Storm application along with the topology * component config is set in spout / bolt with `getComponentConfiguration` -* custom file config is specified with the `-config` option when submitting Storm application from command line or uploaded from UI \ No newline at end of file +* custom file config is specified with the `-config` option when submitting Storm application from command line or uploaded from UI + +## StreamCQL Support + +[StreamCQL](https://github.com/HuaweiBigData/StreamCQL) is a Continuous Query Language on RealTime Computation System open sourced by Huawei. +Since StreamCQL already supports Storm, it's straightforward to run StreamCQL over Gearpump. + +1. Install StreamCQL as in the official [README](https://github.com/HuaweiBigData/StreamCQL#install-streamcql) + +2. Launch Gearpump Nimbus Server as before + +3. Go to the installed stream-cql-binary, and change following settings in `conf/streaming-site.xml` with the output Nimbus configs in Step 2. + + ```xml + <property> + <name>streaming.storm.nimbus.host</name> + <value>${nimbus.host}</value> + </property> + <property> + <name>streaming.storm.nimbus.port</name> + <value>${nimbus.thrift.port}</value> + </property> + ``` + +4. Open CQL client shell and execute a simple cql example + + ``` + bin/cql + ``` + + ``` + Streaming> CREATE INPUT STREAM s + (id INT, name STRING, type INT) + SOURCE randomgen + PROPERTIES ( timeUnit = "SECONDS", period = "1", + eventNumPerperiod = "1", isSchedule = "true" ); + + CREATE OUTPUT STREAM rs + (type INT, cc INT) + SINK consoleOutput; + + INSERT INTO STREAM rs SELECT type, COUNT(id) as cc + FROM s[RANGE 20 SECONDS BATCH] + WHERE id > 5 GROUP BY type; + + SUBMIT APPLICATION example; + ``` + +5. Check the dashboard and you should see data flowing through a topology of 3 components + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/75ad76cb/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala index 96c12ae..6d5b0aa 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala @@ -39,6 +39,7 @@ import io.gearpump.streaming.StreamApplication import io.gearpump.util.{AkkaApp, Constants, LogUtil} import org.apache.storm.shade.org.json.simple.JSONValue import org.apache.storm.shade.org.yaml.snakeyaml.Yaml +import org.apache.storm.shade.org.yaml.snakeyaml.constructor.SafeConstructor import org.slf4j.Logger import scala.collection.JavaConverters._ @@ -79,12 +80,8 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser { val outputConfig = Utils.findAndReadConfigFile(output, false).asInstanceOf[JMap[AnyRef, AnyRef]] updatedConfig.putAll(outputConfig) updatedConfig.putAll(thriftConfig) - // workaround that yaml dumps {nimbus.host: 127.0.0.1} instead of {nimbus.host: '127.0.0.1'} - // which can not be loaded - val nimbusHost = s"${Config.NIMBUS_HOST}: '${thriftConfig.get(Config.NIMBUS_HOST)}'" - updatedConfig.remove(Config.NIMBUS_HOST) - val yaml = new Yaml - val serialized = yaml.dumpAsMap(updatedConfig) ++ s"$nimbusHost\n" + val yaml = new Yaml(new SafeConstructor) + val serialized = yaml.dumpAsMap(updatedConfig) val writer = new FileWriter(new File(output)) try { writer.write(serialized) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/75ad76cb/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala index b813ae0..643e926 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala @@ -50,7 +50,8 @@ object GearpumpStormClient extends AkkaApp with ArgumentsParser { s"-D${PREFER_IPV4}=true" ) - val classPath = Array(s"${System.getProperty(GEARPUMP_HOME)}/lib/storm/*", jar) + val gearpumpHome = System.getProperty(GEARPUMP_HOME) + val classPath = Array(s"$gearpumpHome/lib/*", s"$gearpumpHome/lib/storm/*", jar) val process = Util.startProcess(stormOptions, classPath, topology, stormArgs) // wait till the process exit
