Repository: incubator-gearpump
Updated Branches:
  refs/heads/master c80c06963 -> e9ea6e2f3


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
 
b/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
deleted file mode 100644
index a0d9a42..0000000
--- 
a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.integrationtest.storm
-
-import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
-
-import backtype.storm.topology.TopologyBuilder
-import backtype.storm.{Config, StormSubmitter}
-import storm.kafka.bolt.KafkaBolt
-import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts}
-
-import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-
-/**
- * Tests Storm 0.10.x compatibility over Gearpump
- * this example reads data from Kafka and writes back to it
- */
-object Storm010KafkaTopology extends App with ArgumentsParser {
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "topologyName" -> CLIOption[Int]("<Storm topology name>", required = true),
-    "sourceTopic" -> CLIOption[String]("<Kafka topic to read data>", required 
= true),
-    "sinkTopic" -> CLIOption[String]("<Kafka topic to write data>", required = 
true),
-    "zookeeperConnect" -> CLIOption[String]("<Zookeeper connect string, e.g. 
localhost:2181/kafka>",
-      required = true),
-    "brokerList" -> CLIOption[String]("<Kafka broker list, e.g. 
localhost:9092>", required = true),
-    "spoutNum" -> CLIOption[Int]("<how many spout tasks>", required = false,
-      defaultValue = Some(1)),
-    "boltNum" -> CLIOption[Int]("<how many bolt tasks>", required = false, 
defaultValue = Some(1))
-  )
-
-  val configs = parse(args)
-  val topologyName = configs.getString("topologyName")
-  val sourceTopic = configs.getString("sourceTopic")
-  val sinkTopic = configs.getString("sinkTopic")
-  val zookeeperConnect = configs.getString("zookeeperConnect")
-  val brokerList = configs.getString("brokerList")
-  val spoutNum = configs.getInt("spoutNum")
-  val boltNum = configs.getInt("boltNum")
-
-  val topologyBuilder = new TopologyBuilder()
-  val kafkaSpout: KafkaSpout = new KafkaSpout(getSpoutConfig(sourceTopic,
-    zookeeperConnect, topologyName))
-  val kafkaBolt: KafkaBolt[Array[Byte], Array[Byte]] = new 
KafkaBolt[Array[Byte], Array[Byte]]()
-  val adaptor = new Adaptor
-  topologyBuilder.setSpout("kafka_spout", kafkaSpout, spoutNum)
-  topologyBuilder.setBolt("adaptor", 
adaptor).localOrShuffleGrouping("kafka_spout")
-  topologyBuilder.setBolt("kafka_bolt", kafkaBolt, 
boltNum).localOrShuffleGrouping("adaptor")
-  val config = new Config()
-  config.putAll(getBoltConfig(sinkTopic, brokerList))
-  config.put(Config.TOPOLOGY_NAME, topologyName)
-  val topology = topologyBuilder.createTopology()
-  StormSubmitter.submitTopology(topologyName, config, topology)
-
-  def getSpoutConfig(topic: String, zookeeperConnect: String, id: String): 
SpoutConfig = {
-    val hosts = new ZkHosts(zookeeperConnect)
-    val index = zookeeperConnect.indexOf("/")
-    val zookeeper = zookeeperConnect.take(index)
-    val kafkaRoot = zookeeperConnect.drop(index)
-    val config = new SpoutConfig(hosts, topic, kafkaRoot, id)
-
-    val serverAndPort = zookeeper.split(":")
-    config.zkServers = new JArrayList[String]
-    config.zkServers.add(serverAndPort(0))
-    config.zkPort = serverAndPort(1).toInt
-    config
-  }
-
-  def getBoltConfig(topic: String, brokerList: String): JMap[String, AnyRef] = 
{
-    val kafkaConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
-    val brokerConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
-    brokerConfig.put("metadata.broker.list", brokerList)
-    brokerConfig.put("request.required.acks", "1")
-    kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig)
-    kafkaConfig.put(KafkaBolt.TOPIC, topic)
-    kafkaConfig
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
 
b/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
new file mode 100644
index 0000000..67a2491
--- /dev/null
+++ 
b/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.storm
+
+import backtype.storm.topology.base.BaseBasicBolt
+import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
+import backtype.storm.tuple.{Fields, Tuple, Values}
+import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper
+
+class Adaptor extends BaseBasicBolt {
+  private var id = 0L
+
+  override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = {
+    val bytes = tuple.getBinary(0)
+    collector.emit(new Values(s"$id".getBytes, bytes))
+    id += 1
+  }
+
+  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
+    declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY,
+      FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
 
b/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
new file mode 100644
index 0000000..a0d9a42
--- /dev/null
+++ 
b/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.integrationtest.storm
+
+import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
+
+import backtype.storm.topology.TopologyBuilder
+import backtype.storm.{Config, StormSubmitter}
+import storm.kafka.bolt.KafkaBolt
+import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts}
+
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+
+/**
+ * Tests Storm 0.10.x compatibility over Gearpump
+ * this example reads data from Kafka and writes back to it
+ */
+object Storm010KafkaTopology extends App with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "topologyName" -> CLIOption[Int]("<Storm topology name>", required = true),
+    "sourceTopic" -> CLIOption[String]("<Kafka topic to read data>", required 
= true),
+    "sinkTopic" -> CLIOption[String]("<Kafka topic to write data>", required = 
true),
+    "zookeeperConnect" -> CLIOption[String]("<Zookeeper connect string, e.g. 
localhost:2181/kafka>",
+      required = true),
+    "brokerList" -> CLIOption[String]("<Kafka broker list, e.g. 
localhost:9092>", required = true),
+    "spoutNum" -> CLIOption[Int]("<how many spout tasks>", required = false,
+      defaultValue = Some(1)),
+    "boltNum" -> CLIOption[Int]("<how many bolt tasks>", required = false, 
defaultValue = Some(1))
+  )
+
+  val configs = parse(args)
+  val topologyName = configs.getString("topologyName")
+  val sourceTopic = configs.getString("sourceTopic")
+  val sinkTopic = configs.getString("sinkTopic")
+  val zookeeperConnect = configs.getString("zookeeperConnect")
+  val brokerList = configs.getString("brokerList")
+  val spoutNum = configs.getInt("spoutNum")
+  val boltNum = configs.getInt("boltNum")
+
+  val topologyBuilder = new TopologyBuilder()
+  val kafkaSpout: KafkaSpout = new KafkaSpout(getSpoutConfig(sourceTopic,
+    zookeeperConnect, topologyName))
+  val kafkaBolt: KafkaBolt[Array[Byte], Array[Byte]] = new 
KafkaBolt[Array[Byte], Array[Byte]]()
+  val adaptor = new Adaptor
+  topologyBuilder.setSpout("kafka_spout", kafkaSpout, spoutNum)
+  topologyBuilder.setBolt("adaptor", 
adaptor).localOrShuffleGrouping("kafka_spout")
+  topologyBuilder.setBolt("kafka_bolt", kafkaBolt, 
boltNum).localOrShuffleGrouping("adaptor")
+  val config = new Config()
+  config.putAll(getBoltConfig(sinkTopic, brokerList))
+  config.put(Config.TOPOLOGY_NAME, topologyName)
+  val topology = topologyBuilder.createTopology()
+  StormSubmitter.submitTopology(topologyName, config, topology)
+
+  def getSpoutConfig(topic: String, zookeeperConnect: String, id: String): 
SpoutConfig = {
+    val hosts = new ZkHosts(zookeeperConnect)
+    val index = zookeeperConnect.indexOf("/")
+    val zookeeper = zookeeperConnect.take(index)
+    val kafkaRoot = zookeeperConnect.drop(index)
+    val config = new SpoutConfig(hosts, topic, kafkaRoot, id)
+
+    val serverAndPort = zookeeper.split(":")
+    config.zkServers = new JArrayList[String]
+    config.zkServers.add(serverAndPort(0))
+    config.zkPort = serverAndPort(1).toInt
+    config
+  }
+
+  def getBoltConfig(topic: String, brokerList: String): JMap[String, AnyRef] = 
{
+    val kafkaConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
+    val brokerConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
+    brokerConfig.put("metadata.broker.list", brokerList)
+    brokerConfig.put("request.required.acks", "1")
+    kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig)
+    kafkaConfig.put(KafkaBolt.TOPIC, topic)
+    kafkaConfig
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
 
b/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
deleted file mode 100644
index 67a2491..0000000
--- 
a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.storm
-
-import backtype.storm.topology.base.BaseBasicBolt
-import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
-import backtype.storm.tuple.{Fields, Tuple, Values}
-import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper
-
-class Adaptor extends BaseBasicBolt {
-  private var id = 0L
-
-  override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = {
-    val bytes = tuple.getBinary(0)
-    collector.emit(new Values(s"$id".getBytes, bytes))
-    id += 1
-  }
-
-  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
-    declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY,
-      FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
 
b/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
deleted file mode 100644
index 5b74d60..0000000
--- 
a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.storm
-
-import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
-
-import backtype.storm.topology.TopologyBuilder
-import backtype.storm.{Config, StormSubmitter}
-import storm.kafka.bolt.KafkaBolt
-import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts}
-
-import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-
-/**
- * Tests Storm 0.9.x compatibility over Gearpump
- * this example reads data from Kafka and writes back to it
- */
-object Storm09KafkaTopology extends App with ArgumentsParser {
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "topologyName" -> CLIOption[Int]("<Storm topology name>", required = true),
-    "sourceTopic" -> CLIOption[String]("<Kafka topic to read data>", required 
= true),
-    "sinkTopic" -> CLIOption[String]("<Kafka topic to write data>", required = 
true),
-    "zookeeperConnect" -> CLIOption[String]("<Zookeeper connect string, e.g. 
localhost:2181/kafka>",
-      required = true),
-    "brokerList" -> CLIOption[String]("<Kafka broker list, e.g. 
localhost:9092>", required = true),
-    "spoutNum" -> CLIOption[Int]("<how many spout tasks>", required = false,
-      defaultValue = Some(1)),
-    "boltNum" -> CLIOption[Int]("<how many bolt tasks>", required = false, 
defaultValue = Some(1))
-  )
-
-  val configs = parse(args)
-  val topologyName = configs.getString("topologyName")
-  val sourceTopic = configs.getString("sourceTopic")
-  val sinkTopic = configs.getString("sinkTopic")
-  val zookeeperConnect = configs.getString("zookeeperConnect")
-  val brokerList = configs.getString("brokerList")
-  val spoutNum = configs.getInt("spoutNum")
-  val boltNum = configs.getInt("boltNum")
-
-  val topologyBuilder = new TopologyBuilder()
-  val kafkaSpout: KafkaSpout = new KafkaSpout(getSpoutConfig(sourceTopic, 
zookeeperConnect,
-    topologyName))
-  val kafkaBolt: KafkaBolt[Array[Byte], Array[Byte]] = new 
KafkaBolt[Array[Byte], Array[Byte]]()
-  val adaptor = new Adaptor
-  topologyBuilder.setSpout("kafka_spout", kafkaSpout, spoutNum)
-  topologyBuilder.setBolt("adaptor", 
adaptor).localOrShuffleGrouping("kafka_spout")
-  topologyBuilder.setBolt("kafka_bolt", kafkaBolt, 
boltNum).localOrShuffleGrouping("adaptor")
-  val config = new Config()
-  config.putAll(getBoltConfig(sinkTopic, brokerList))
-  config.put(Config.TOPOLOGY_NAME, topologyName)
-  val topology = topologyBuilder.createTopology()
-  StormSubmitter.submitTopology(topologyName, config, topology)
-
-  def getSpoutConfig(topic: String, zookeeperConnect: String, id: String): 
SpoutConfig = {
-    val hosts = new ZkHosts(zookeeperConnect)
-    val index = zookeeperConnect.indexOf("/")
-    val zookeeper = zookeeperConnect.take(index)
-    val kafkaRoot = zookeeperConnect.drop(index)
-    val config = new SpoutConfig(hosts, topic, kafkaRoot, id)
-    config.forceFromStart = true
-
-    val serverAndPort = zookeeper.split(":")
-    config.zkServers = new JArrayList[String]
-    config.zkServers.add(serverAndPort(0))
-    config.zkPort = serverAndPort(1).toInt
-    config
-  }
-
-  def getBoltConfig(topic: String, brokerList: String): JMap[String, AnyRef] = 
{
-    val kafkaConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
-    val brokerConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
-    brokerConfig.put("metadata.broker.list", brokerList)
-    brokerConfig.put("request.required.acks", "1")
-    kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig)
-    kafkaConfig.put(KafkaBolt.TOPIC, topic)
-    kafkaConfig
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
 
b/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
new file mode 100644
index 0000000..67a2491
--- /dev/null
+++ 
b/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.storm
+
+import backtype.storm.topology.base.BaseBasicBolt
+import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
+import backtype.storm.tuple.{Fields, Tuple, Values}
+import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper
+
+class Adaptor extends BaseBasicBolt {
+  private var id = 0L
+
+  override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = {
+    val bytes = tuple.getBinary(0)
+    collector.emit(new Values(s"$id".getBytes, bytes))
+    id += 1
+  }
+
+  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
+    declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY,
+      FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
 
b/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
new file mode 100644
index 0000000..5b74d60
--- /dev/null
+++ 
b/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.storm
+
+import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
+
+import backtype.storm.topology.TopologyBuilder
+import backtype.storm.{Config, StormSubmitter}
+import storm.kafka.bolt.KafkaBolt
+import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts}
+
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+
+/**
+ * Tests Storm 0.9.x compatibility over Gearpump
+ * this example reads data from Kafka and writes back to it
+ */
+object Storm09KafkaTopology extends App with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "topologyName" -> CLIOption[Int]("<Storm topology name>", required = true),
+    "sourceTopic" -> CLIOption[String]("<Kafka topic to read data>", required 
= true),
+    "sinkTopic" -> CLIOption[String]("<Kafka topic to write data>", required = 
true),
+    "zookeeperConnect" -> CLIOption[String]("<Zookeeper connect string, e.g. 
localhost:2181/kafka>",
+      required = true),
+    "brokerList" -> CLIOption[String]("<Kafka broker list, e.g. 
localhost:9092>", required = true),
+    "spoutNum" -> CLIOption[Int]("<how many spout tasks>", required = false,
+      defaultValue = Some(1)),
+    "boltNum" -> CLIOption[Int]("<how many bolt tasks>", required = false, 
defaultValue = Some(1))
+  )
+
+  val configs = parse(args)
+  val topologyName = configs.getString("topologyName")
+  val sourceTopic = configs.getString("sourceTopic")
+  val sinkTopic = configs.getString("sinkTopic")
+  val zookeeperConnect = configs.getString("zookeeperConnect")
+  val brokerList = configs.getString("brokerList")
+  val spoutNum = configs.getInt("spoutNum")
+  val boltNum = configs.getInt("boltNum")
+
+  val topologyBuilder = new TopologyBuilder()
+  val kafkaSpout: KafkaSpout = new KafkaSpout(getSpoutConfig(sourceTopic, 
zookeeperConnect,
+    topologyName))
+  val kafkaBolt: KafkaBolt[Array[Byte], Array[Byte]] = new 
KafkaBolt[Array[Byte], Array[Byte]]()
+  val adaptor = new Adaptor
+  topologyBuilder.setSpout("kafka_spout", kafkaSpout, spoutNum)
+  topologyBuilder.setBolt("adaptor", 
adaptor).localOrShuffleGrouping("kafka_spout")
+  topologyBuilder.setBolt("kafka_bolt", kafkaBolt, 
boltNum).localOrShuffleGrouping("adaptor")
+  val config = new Config()
+  config.putAll(getBoltConfig(sinkTopic, brokerList))
+  config.put(Config.TOPOLOGY_NAME, topologyName)
+  val topology = topologyBuilder.createTopology()
+  StormSubmitter.submitTopology(topologyName, config, topology)
+
+  def getSpoutConfig(topic: String, zookeeperConnect: String, id: String): 
SpoutConfig = {
+    val hosts = new ZkHosts(zookeeperConnect)
+    val index = zookeeperConnect.indexOf("/")
+    val zookeeper = zookeeperConnect.take(index)
+    val kafkaRoot = zookeeperConnect.drop(index)
+    val config = new SpoutConfig(hosts, topic, kafkaRoot, id)
+    config.forceFromStart = true
+
+    val serverAndPort = zookeeper.split(":")
+    config.zkServers = new JArrayList[String]
+    config.zkServers.add(serverAndPort(0))
+    config.zkPort = serverAndPort(1).toInt
+    config
+  }
+
+  def getBoltConfig(topic: String, brokerList: String): JMap[String, AnyRef] = 
{
+    val kafkaConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
+    val brokerConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
+    brokerConfig.put("metadata.broker.list", brokerList)
+    brokerConfig.put("request.required.acks", "1")
+    kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig)
+    kafkaConfig.put(KafkaBolt.TOPIC, topic)
+    kafkaConfig
+  }
+}
+

Reply via email to