Repository: incubator-gearpump Updated Branches: refs/heads/master c80c06963 -> e9ea6e2f3
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala ---------------------------------------------------------------------- diff --git a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala b/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala deleted file mode 100644 index a0d9a42..0000000 --- a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.integrationtest.storm - -import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} - -import backtype.storm.topology.TopologyBuilder -import backtype.storm.{Config, StormSubmitter} -import storm.kafka.bolt.KafkaBolt -import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts} - -import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} - -/** - * Tests Storm 0.10.x compatibility over Gearpump - * this example reads data from Kafka and writes back to it - */ -object Storm010KafkaTopology extends App with ArgumentsParser { - - override val options: Array[(String, CLIOption[Any])] = Array( - "topologyName" -> CLIOption[Int]("<Storm topology name>", required = true), - "sourceTopic" -> CLIOption[String]("<Kafka topic to read data>", required = true), - "sinkTopic" -> CLIOption[String]("<Kafka topic to write data>", required = true), - "zookeeperConnect" -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>", - required = true), - "brokerList" -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true), - "spoutNum" -> CLIOption[Int]("<how many spout tasks>", required = false, - defaultValue = Some(1)), - "boltNum" -> CLIOption[Int]("<how many bolt tasks>", required = false, defaultValue = Some(1)) - ) - - val configs = parse(args) - val topologyName = configs.getString("topologyName") - val sourceTopic = configs.getString("sourceTopic") - val sinkTopic = configs.getString("sinkTopic") - val zookeeperConnect = configs.getString("zookeeperConnect") - val brokerList = configs.getString("brokerList") - val spoutNum = configs.getInt("spoutNum") - val boltNum = configs.getInt("boltNum") - - val topologyBuilder = new TopologyBuilder() - val kafkaSpout: KafkaSpout = new KafkaSpout(getSpoutConfig(sourceTopic, - zookeeperConnect, topologyName)) - val kafkaBolt: KafkaBolt[Array[Byte], Array[Byte]] = new KafkaBolt[Array[Byte], Array[Byte]]() - val adaptor = new Adaptor - topologyBuilder.setSpout("kafka_spout", kafkaSpout, spoutNum) - topologyBuilder.setBolt("adaptor", adaptor).localOrShuffleGrouping("kafka_spout") - topologyBuilder.setBolt("kafka_bolt", kafkaBolt, boltNum).localOrShuffleGrouping("adaptor") - val config = new Config() - config.putAll(getBoltConfig(sinkTopic, brokerList)) - config.put(Config.TOPOLOGY_NAME, topologyName) - val topology = topologyBuilder.createTopology() - StormSubmitter.submitTopology(topologyName, config, topology) - - def getSpoutConfig(topic: String, zookeeperConnect: String, id: String): SpoutConfig = { - val hosts = new ZkHosts(zookeeperConnect) - val index = zookeeperConnect.indexOf("/") - val zookeeper = zookeeperConnect.take(index) - val kafkaRoot = zookeeperConnect.drop(index) - val config = new SpoutConfig(hosts, topic, kafkaRoot, id) - - val serverAndPort = zookeeper.split(":") - config.zkServers = new JArrayList[String] - config.zkServers.add(serverAndPort(0)) - config.zkPort = serverAndPort(1).toInt - config - } - - def getBoltConfig(topic: String, brokerList: String): JMap[String, AnyRef] = { - val kafkaConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef] - val brokerConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef] - brokerConfig.put("metadata.broker.list", brokerList) - brokerConfig.put("request.required.acks", "1") - kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig) - kafkaConfig.put(KafkaBolt.TOPIC, topic) - kafkaConfig - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala ---------------------------------------------------------------------- diff --git a/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala b/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala new file mode 100644 index 0000000..67a2491 --- /dev/null +++ b/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.storm + +import backtype.storm.topology.base.BaseBasicBolt +import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer} +import backtype.storm.tuple.{Fields, Tuple, Values} +import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper + +class Adaptor extends BaseBasicBolt { + private var id = 0L + + override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = { + val bytes = tuple.getBinary(0) + collector.emit(new Values(s"$id".getBytes, bytes)) + id += 1 + } + + override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { + declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY, + FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm010KafkaTopology.scala ---------------------------------------------------------------------- diff --git a/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm010KafkaTopology.scala b/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm010KafkaTopology.scala new file mode 100644 index 0000000..a0d9a42 --- /dev/null +++ b/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm010KafkaTopology.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.integrationtest.storm + +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} + +import backtype.storm.topology.TopologyBuilder +import backtype.storm.{Config, StormSubmitter} +import storm.kafka.bolt.KafkaBolt +import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts} + +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} + +/** + * Tests Storm 0.10.x compatibility over Gearpump + * this example reads data from Kafka and writes back to it + */ +object Storm010KafkaTopology extends App with ArgumentsParser { + + override val options: Array[(String, CLIOption[Any])] = Array( + "topologyName" -> CLIOption[Int]("<Storm topology name>", required = true), + "sourceTopic" -> CLIOption[String]("<Kafka topic to read data>", required = true), + "sinkTopic" -> CLIOption[String]("<Kafka topic to write data>", required = true), + "zookeeperConnect" -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>", + required = true), + "brokerList" -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true), + "spoutNum" -> CLIOption[Int]("<how many spout tasks>", required = false, + defaultValue = Some(1)), + "boltNum" -> CLIOption[Int]("<how many bolt tasks>", required = false, defaultValue = Some(1)) + ) + + val configs = parse(args) + val topologyName = configs.getString("topologyName") + val sourceTopic = configs.getString("sourceTopic") + val sinkTopic = configs.getString("sinkTopic") + val zookeeperConnect = configs.getString("zookeeperConnect") + val brokerList = configs.getString("brokerList") + val spoutNum = configs.getInt("spoutNum") + val boltNum = configs.getInt("boltNum") + + val topologyBuilder = new TopologyBuilder() + val kafkaSpout: KafkaSpout = new KafkaSpout(getSpoutConfig(sourceTopic, + zookeeperConnect, topologyName)) + val kafkaBolt: KafkaBolt[Array[Byte], Array[Byte]] = new KafkaBolt[Array[Byte], Array[Byte]]() + val adaptor = new Adaptor + topologyBuilder.setSpout("kafka_spout", kafkaSpout, spoutNum) + topologyBuilder.setBolt("adaptor", adaptor).localOrShuffleGrouping("kafka_spout") + topologyBuilder.setBolt("kafka_bolt", kafkaBolt, boltNum).localOrShuffleGrouping("adaptor") + val config = new Config() + config.putAll(getBoltConfig(sinkTopic, brokerList)) + config.put(Config.TOPOLOGY_NAME, topologyName) + val topology = topologyBuilder.createTopology() + StormSubmitter.submitTopology(topologyName, config, topology) + + def getSpoutConfig(topic: String, zookeeperConnect: String, id: String): SpoutConfig = { + val hosts = new ZkHosts(zookeeperConnect) + val index = zookeeperConnect.indexOf("/") + val zookeeper = zookeeperConnect.take(index) + val kafkaRoot = zookeeperConnect.drop(index) + val config = new SpoutConfig(hosts, topic, kafkaRoot, id) + + val serverAndPort = zookeeper.split(":") + config.zkServers = new JArrayList[String] + config.zkServers.add(serverAndPort(0)) + config.zkPort = serverAndPort(1).toInt + config + } + + def getBoltConfig(topic: String, brokerList: String): JMap[String, AnyRef] = { + val kafkaConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef] + val brokerConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef] + brokerConfig.put("metadata.broker.list", brokerList) + brokerConfig.put("request.required.acks", "1") + kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig) + kafkaConfig.put(KafkaBolt.TOPIC, topic) + kafkaConfig + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala ---------------------------------------------------------------------- diff --git a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala b/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala deleted file mode 100644 index 67a2491..0000000 --- a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.storm - -import backtype.storm.topology.base.BaseBasicBolt -import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer} -import backtype.storm.tuple.{Fields, Tuple, Values} -import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper - -class Adaptor extends BaseBasicBolt { - private var id = 0L - - override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = { - val bytes = tuple.getBinary(0) - collector.emit(new Values(s"$id".getBytes, bytes)) - id += 1 - } - - override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { - declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY, - FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala ---------------------------------------------------------------------- diff --git a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala b/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala deleted file mode 100644 index 5b74d60..0000000 --- a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.storm - -import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} - -import backtype.storm.topology.TopologyBuilder -import backtype.storm.{Config, StormSubmitter} -import storm.kafka.bolt.KafkaBolt -import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts} - -import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} - -/** - * Tests Storm 0.9.x compatibility over Gearpump - * this example reads data from Kafka and writes back to it - */ -object Storm09KafkaTopology extends App with ArgumentsParser { - - override val options: Array[(String, CLIOption[Any])] = Array( - "topologyName" -> CLIOption[Int]("<Storm topology name>", required = true), - "sourceTopic" -> CLIOption[String]("<Kafka topic to read data>", required = true), - "sinkTopic" -> CLIOption[String]("<Kafka topic to write data>", required = true), - "zookeeperConnect" -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>", - required = true), - "brokerList" -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true), - "spoutNum" -> CLIOption[Int]("<how many spout tasks>", required = false, - defaultValue = Some(1)), - "boltNum" -> CLIOption[Int]("<how many bolt tasks>", required = false, defaultValue = Some(1)) - ) - - val configs = parse(args) - val topologyName = configs.getString("topologyName") - val sourceTopic = configs.getString("sourceTopic") - val sinkTopic = configs.getString("sinkTopic") - val zookeeperConnect = configs.getString("zookeeperConnect") - val brokerList = configs.getString("brokerList") - val spoutNum = configs.getInt("spoutNum") - val boltNum = configs.getInt("boltNum") - - val topologyBuilder = new TopologyBuilder() - val kafkaSpout: KafkaSpout = new KafkaSpout(getSpoutConfig(sourceTopic, zookeeperConnect, - topologyName)) - val kafkaBolt: KafkaBolt[Array[Byte], Array[Byte]] = new KafkaBolt[Array[Byte], Array[Byte]]() - val adaptor = new Adaptor - topologyBuilder.setSpout("kafka_spout", kafkaSpout, spoutNum) - topologyBuilder.setBolt("adaptor", adaptor).localOrShuffleGrouping("kafka_spout") - topologyBuilder.setBolt("kafka_bolt", kafkaBolt, boltNum).localOrShuffleGrouping("adaptor") - val config = new Config() - config.putAll(getBoltConfig(sinkTopic, brokerList)) - config.put(Config.TOPOLOGY_NAME, topologyName) - val topology = topologyBuilder.createTopology() - StormSubmitter.submitTopology(topologyName, config, topology) - - def getSpoutConfig(topic: String, zookeeperConnect: String, id: String): SpoutConfig = { - val hosts = new ZkHosts(zookeeperConnect) - val index = zookeeperConnect.indexOf("/") - val zookeeper = zookeeperConnect.take(index) - val kafkaRoot = zookeeperConnect.drop(index) - val config = new SpoutConfig(hosts, topic, kafkaRoot, id) - config.forceFromStart = true - - val serverAndPort = zookeeper.split(":") - config.zkServers = new JArrayList[String] - config.zkServers.add(serverAndPort(0)) - config.zkPort = serverAndPort(1).toInt - config - } - - def getBoltConfig(topic: String, brokerList: String): JMap[String, AnyRef] = { - val kafkaConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef] - val brokerConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef] - brokerConfig.put("metadata.broker.list", brokerList) - brokerConfig.put("request.required.acks", "1") - kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig) - kafkaConfig.put(KafkaBolt.TOPIC, topic) - kafkaConfig - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala ---------------------------------------------------------------------- diff --git a/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala b/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala new file mode 100644 index 0000000..67a2491 --- /dev/null +++ b/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.storm + +import backtype.storm.topology.base.BaseBasicBolt +import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer} +import backtype.storm.tuple.{Fields, Tuple, Values} +import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper + +class Adaptor extends BaseBasicBolt { + private var id = 0L + + override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = { + val bytes = tuple.getBinary(0) + collector.emit(new Values(s"$id".getBytes, bytes)) + id += 1 + } + + override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { + declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY, + FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm09KafkaTopology.scala ---------------------------------------------------------------------- diff --git a/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm09KafkaTopology.scala b/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm09KafkaTopology.scala new file mode 100644 index 0000000..5b74d60 --- /dev/null +++ b/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm09KafkaTopology.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.storm + +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} + +import backtype.storm.topology.TopologyBuilder +import backtype.storm.{Config, StormSubmitter} +import storm.kafka.bolt.KafkaBolt +import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts} + +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} + +/** + * Tests Storm 0.9.x compatibility over Gearpump + * this example reads data from Kafka and writes back to it + */ +object Storm09KafkaTopology extends App with ArgumentsParser { + + override val options: Array[(String, CLIOption[Any])] = Array( + "topologyName" -> CLIOption[Int]("<Storm topology name>", required = true), + "sourceTopic" -> CLIOption[String]("<Kafka topic to read data>", required = true), + "sinkTopic" -> CLIOption[String]("<Kafka topic to write data>", required = true), + "zookeeperConnect" -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>", + required = true), + "brokerList" -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true), + "spoutNum" -> CLIOption[Int]("<how many spout tasks>", required = false, + defaultValue = Some(1)), + "boltNum" -> CLIOption[Int]("<how many bolt tasks>", required = false, defaultValue = Some(1)) + ) + + val configs = parse(args) + val topologyName = configs.getString("topologyName") + val sourceTopic = configs.getString("sourceTopic") + val sinkTopic = configs.getString("sinkTopic") + val zookeeperConnect = configs.getString("zookeeperConnect") + val brokerList = configs.getString("brokerList") + val spoutNum = configs.getInt("spoutNum") + val boltNum = configs.getInt("boltNum") + + val topologyBuilder = new TopologyBuilder() + val kafkaSpout: KafkaSpout = new KafkaSpout(getSpoutConfig(sourceTopic, zookeeperConnect, + topologyName)) + val kafkaBolt: KafkaBolt[Array[Byte], Array[Byte]] = new KafkaBolt[Array[Byte], Array[Byte]]() + val adaptor = new Adaptor + topologyBuilder.setSpout("kafka_spout", kafkaSpout, spoutNum) + topologyBuilder.setBolt("adaptor", adaptor).localOrShuffleGrouping("kafka_spout") + topologyBuilder.setBolt("kafka_bolt", kafkaBolt, boltNum).localOrShuffleGrouping("adaptor") + val config = new Config() + config.putAll(getBoltConfig(sinkTopic, brokerList)) + config.put(Config.TOPOLOGY_NAME, topologyName) + val topology = topologyBuilder.createTopology() + StormSubmitter.submitTopology(topologyName, config, topology) + + def getSpoutConfig(topic: String, zookeeperConnect: String, id: String): SpoutConfig = { + val hosts = new ZkHosts(zookeeperConnect) + val index = zookeeperConnect.indexOf("/") + val zookeeper = zookeeperConnect.take(index) + val kafkaRoot = zookeeperConnect.drop(index) + val config = new SpoutConfig(hosts, topic, kafkaRoot, id) + config.forceFromStart = true + + val serverAndPort = zookeeper.split(":") + config.zkServers = new JArrayList[String] + config.zkServers.add(serverAndPort(0)) + config.zkPort = serverAndPort(1).toInt + config + } + + def getBoltConfig(topic: String, brokerList: String): JMap[String, AnyRef] = { + val kafkaConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef] + val brokerConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef] + brokerConfig.put("metadata.broker.list", brokerList) + brokerConfig.put("request.required.acks", "1") + kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig) + kafkaConfig.put(KafkaBolt.TOPIC, topic) + kafkaConfig + } +} +
