Fixing stylecheck problems with storm-hdfs-examples
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e8ffac45 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e8ffac45 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e8ffac45 Branch: refs/heads/master Commit: e8ffac458baf6cc7a90dd0ced78cc01f0ffb4229 Parents: f7f3524 Author: Kishor Patil <[email protected]> Authored: Mon Apr 23 00:44:09 2018 -0400 Committer: Kishor Patil <[email protected]> Committed: Mon Apr 23 02:32:42 2018 -0400 ---------------------------------------------------------------------- examples/storm-hdfs-examples/pom.xml | 2 +- .../storm/hdfs/bolt/HdfsFileTopology.java | 54 ++-- .../storm/hdfs/bolt/SequenceFileTopology.java | 58 ++-- .../storm/hdfs/spout/HdfsSpoutTopology.java | 307 +++++++++---------- .../storm/hdfs/trident/FixedBatchSpout.java | 29 +- .../storm/hdfs/trident/TridentFileTopology.java | 30 +- .../hdfs/trident/TridentSequenceTopology.java | 30 +- 7 files changed, 248 insertions(+), 262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/e8ffac45/examples/storm-hdfs-examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-hdfs-examples/pom.xml b/examples/storm-hdfs-examples/pom.xml index c2271fe..30b07e6 100644 --- a/examples/storm-hdfs-examples/pom.xml +++ b/examples/storm-hdfs-examples/pom.xml @@ -89,7 +89,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>224</maxAllowedViolations> + <maxAllowedViolations>29</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/storm/blob/e8ffac45/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java index f8d69ca..e3599d5 100644 --- a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java +++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.bolt; import java.io.FileInputStream; @@ -23,7 +18,6 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; @@ -65,12 +59,12 @@ public class HdfsFileTopology { FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimedRotationPolicy.TimeUnit.MINUTES); FileNameFormat fileNameFormat = new DefaultFileNameFormat() - .withPath("/tmp/foo/") - .withExtension(".txt"); + .withPath("/tmp/foo/") + .withExtension(".txt"); // use "|" instead of "," for field delimiter RecordFormat format = new DelimitedRecordFormat() - .withFieldDelimiter("|"); + .withFieldDelimiter("|"); Yaml yaml = new Yaml(); InputStream in = new FileInputStream(args[1]); @@ -79,20 +73,20 @@ public class HdfsFileTopology { config.put("hdfs.config", yamlConf); HdfsBolt bolt = new HdfsBolt() - .withConfigKey("hdfs.config") - .withFsUrl(args[0]) - .withFileNameFormat(fileNameFormat) - .withRecordFormat(format) - .withRotationPolicy(rotationPolicy) - .withSyncPolicy(syncPolicy) - .addRotationAction(new MoveFileAction().toDestination("/tmp/dest2/")); + .withConfigKey("hdfs.config") + .withFsUrl(args[0]) + .withFileNameFormat(fileNameFormat) + .withRecordFormat(format) + .withRotationPolicy(rotationPolicy) + .withSyncPolicy(syncPolicy) + .addRotationAction(new MoveFileAction().toDestination("/tmp/dest2/")); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, spout, 1); // SentenceSpout --> MyBolt builder.setBolt(BOLT_ID, bolt, 4) - .shuffleGrouping(SENTENCE_SPOUT_ID); + .shuffleGrouping(SENTENCE_SPOUT_ID); String topoName = TOPOLOGY_NAME; if (args.length == 3) { topoName = args[2]; @@ -114,11 +108,11 @@ public class HdfsFileTopology { private ConcurrentHashMap<UUID, Values> pending; private SpoutOutputCollector collector; private String[] sentences = { - "my dog has fleas", - "i like cold beverages", - "the dog ate my homework", - "don't have a cow man", - "i don't think i like fleas" + "my dog has fleas", + "i like cold beverages", + "the dog ate my homework", + "don't have a cow man", + "i don't think i like fleas" }; private int index = 0; private int count = 0; @@ -145,7 +139,7 @@ public class HdfsFileTopology { } count++; total++; - if(count > 20000){ + if (count > 20000) { count = 0; System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total); } http://git-wip-us.apache.org/repos/asf/storm/blob/e8ffac45/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java index 2707af5..8ddd045 100644 --- a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java +++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.bolt; import java.io.FileInputStream; @@ -23,7 +18,6 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; - import org.apache.hadoop.io.SequenceFile; import org.apache.storm.Config; import org.apache.storm.StormSubmitter; @@ -66,8 +60,8 @@ public class SequenceFileTopology { FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); FileNameFormat fileNameFormat = new DefaultFileNameFormat() - .withPath("/tmp/source/") - .withExtension(".seq"); + .withPath("/tmp/source/") + .withExtension(".seq"); // create sequence format instance. DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", "sentence"); @@ -79,22 +73,22 @@ public class SequenceFileTopology { config.put("hdfs.config", yamlConf); SequenceFileBolt bolt = new SequenceFileBolt() - .withFsUrl(args[0]) - .withConfigKey("hdfs.config") - .withFileNameFormat(fileNameFormat) - .withSequenceFormat(format) - .withRotationPolicy(rotationPolicy) - .withSyncPolicy(syncPolicy) - .withCompressionType(SequenceFile.CompressionType.RECORD) - .withCompressionCodec("deflate") - .addRotationAction(new MoveFileAction().toDestination("/tmp/dest/")); + .withFsUrl(args[0]) + .withConfigKey("hdfs.config") + .withFileNameFormat(fileNameFormat) + .withSequenceFormat(format) + .withRotationPolicy(rotationPolicy) + .withSyncPolicy(syncPolicy) + .withCompressionType(SequenceFile.CompressionType.RECORD) + .withCompressionCodec("deflate") + .addRotationAction(new MoveFileAction().toDestination("/tmp/dest/")); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, spout, 1); // SentenceSpout --> MyBolt builder.setBolt(BOLT_ID, bolt, 4) - .shuffleGrouping(SENTENCE_SPOUT_ID); + .shuffleGrouping(SENTENCE_SPOUT_ID); String topoName = TOPOLOGY_NAME; if (args.length == 3) { @@ -120,11 +114,11 @@ public class SequenceFileTopology { private ConcurrentHashMap<UUID, Values> pending; private SpoutOutputCollector collector; private String[] sentences = { - "my dog has fleas", - "i like cold beverages", - "the dog ate my homework", - "don't have a cow man", - "i don't think i like fleas" + "my dog has fleas", + "i like cold beverages", + "the dog ate my homework", + "don't have a cow man", + "i don't think i like fleas" }; private int index = 0; private int count = 0; @@ -151,7 +145,7 @@ public class SequenceFileTopology { } count++; total++; - if(count > 20000){ + if (count > 20000) { count = 0; System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total); } @@ -159,7 +153,7 @@ public class SequenceFileTopology { } public void ack(Object msgId) { -// System.out.println("ACK"); + // System.out.println("ACK"); this.pending.remove(msgId); } http://git-wip-us.apache.org/repos/asf/storm/blob/e8ffac45/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java index 943ca60..e3282d8 100644 --- a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java +++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java @@ -1,192 +1,187 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.spout; +import java.util.Map; import org.apache.storm.Config; import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.ClusterSummary; +import org.apache.storm.generated.ExecutorSummary; import org.apache.storm.generated.KillOptions; import org.apache.storm.generated.Nimbus; +import org.apache.storm.generated.SpoutStats; import org.apache.storm.generated.TopologyInfo; import org.apache.storm.generated.TopologySummary; -import org.apache.storm.generated.ExecutorSummary; -import org.apache.storm.generated.SpoutStats; -import org.apache.storm.generated.ClusterSummary; import org.apache.storm.metric.LoggingMetricsConsumer; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.utils.Utils; -import org.apache.storm.utils.NimbusClient; import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.NimbusClient; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; - public class HdfsSpoutTopology { - public static final String SPOUT_ID = "hdfsspout"; - public static final String BOLT_ID = "constbolt"; - + public static final String SPOUT_ID = "hdfsspout"; + public static final String BOLT_ID = "constbolt"; + + /** + * Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming + */ + public static void main(String[] args) throws Exception { + // 0 - validate args + if (args.length < 7) { + System.err.println("Please check command line arguments."); + System.err.println("Usage :"); + System.err.println( + HdfsSpoutTopology.class.toString() + " topologyName hdfsUri fileFormat sourceDir sourceArchiveDir badDir destinationDir."); + System.err.println(" topologyName - topology name."); + System.err.println(" hdfsUri - hdfs name node URI"); + System.err.println(" fileFormat - Set to 'TEXT' for reading text files or 'SEQ' for sequence files."); + System.err.println(" sourceDir - read files from this HDFS dir using HdfsSpout."); + System.err.println(" archiveDir - after a file in sourceDir is read completely, it is moved to this HDFS location."); + System.err.println(" badDir - files that cannot be read properly will be moved to this HDFS location."); + System.err.println(" spoutCount - Num of spout instances."); + System.err.println(); + System.exit(-1); + } - public static class ConstBolt extends BaseRichBolt { - private static final long serialVersionUID = -5313598399155365865L; - public static final String FIELDS = "message"; - private OutputCollector collector; - private static final Logger log = LoggerFactory.getLogger(ConstBolt.class); - int count =0; + // 1 - parse cmd line args + String topologyName = args[0]; + String hdfsUri = args[1]; + String fileFormat = args[2]; + String sourceDir = args[3]; + String archiveDir = args[4]; + String badDir = args[5]; + int spoutNum = Integer.parseInt(args[6]); + + // 2 - create and configure spout and bolt + ConstBolt bolt = new ConstBolt(); + + HdfsSpout spout = new HdfsSpout().withOutputFields(TextFileReader.defaultFields) + .setReaderType(fileFormat) + .setHdfsUri(hdfsUri) + .setSourceDir(sourceDir) + .setArchiveDir(archiveDir) + .setBadFilesDir(badDir); + + // 3 - Create and configure topology + Config conf = new Config(); + conf.setNumWorkers(1); + conf.setNumAckers(1); + conf.setMaxTaskParallelism(1); + conf.setDebug(true); + conf.registerMetricsConsumer(LoggingMetricsConsumer.class); + + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(SPOUT_ID, spout, spoutNum); + builder.setBolt(BOLT_ID, bolt, 1).shuffleGrouping(SPOUT_ID); + + // 4 - submit topology, wait for a few min and terminate it + Map<String, Object> clusterConf = Utils.readStormConfig(); + StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); + Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient(); + + // 5 - Print metrics every 30 sec, kill topology after 20 min + for (int i = 0; i < 40; i++) { + Thread.sleep(30 * 1000); + printMetrics(client, topologyName); + } + kill(client, topologyName); + } // main - public ConstBolt() { + private static void kill(Nimbus.Iface client, String topologyName) throws Exception { + KillOptions opts = new KillOptions(); + opts.set_wait_secs(0); + client.killTopologyWithOpts(topologyName, opts); } - @Override - public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { - this.collector = collector; + static void printMetrics(Nimbus.Iface client, String name) throws Exception { + ClusterSummary summary = client.getClusterInfo(); + String id = null; + for (TopologySummary ts : summary.get_topologies()) { + if (name.equals(ts.get_name())) { + id = ts.get_id(); + } + } + if (id == null) { + throw new Exception("Could not find a topology named " + name); + } + TopologyInfo info = client.getTopologyInfo(id); + int uptime = info.get_uptime_secs(); + long acked = 0; + long failed = 0; + double weightedAvgTotal = 0.0; + for (ExecutorSummary exec : info.get_executors()) { + if ("spout".equals(exec.get_component_id())) { + SpoutStats stats = exec.get_stats().get_specific().get_spout(); + Map<String, Long> failedMap = stats.get_failed().get(":all-time"); + Map<String, Long> ackedMap = stats.get_acked().get(":all-time"); + Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time"); + for (String key : ackedMap.keySet()) { + if (failedMap != null) { + Long tmp = failedMap.get(key); + if (tmp != null) { + failed += tmp; + } + } + long ackVal = ackedMap.get(key); + double latVal = avgLatMap.get(key) * ackVal; + acked += ackVal; + weightedAvgTotal += latVal; + } + } + } + double avgLatency = weightedAvgTotal / acked; + System.out.println("uptime: " + uptime + " acked: " + acked + " avgLatency: " + avgLatency + " acked/sec: " + + (((double) acked) / uptime + " failed: " + failed)); } - @Override - public void execute(Tuple tuple) { - log.info("Received tuple : {}", tuple.getValue(0)); - count++; - if(count==3) { - collector.fail(tuple); - } - else { - collector.ack(tuple); - } - } + public static class ConstBolt extends BaseRichBolt { + public static final String FIELDS = "message"; + private static final long serialVersionUID = -5313598399155365865L; + private static final Logger log = LoggerFactory.getLogger(ConstBolt.class); + int count = 0; + private OutputCollector collector; - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(FIELDS)); - } - } // class - - /** Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming */ - public static void main(String[] args) throws Exception { - // 0 - validate args - if (args.length < 7) { - System.err.println("Please check command line arguments."); - System.err.println("Usage :"); - System.err.println(HdfsSpoutTopology.class.toString() + " topologyName hdfsUri fileFormat sourceDir sourceArchiveDir badDir destinationDir."); - System.err.println(" topologyName - topology name."); - System.err.println(" hdfsUri - hdfs name node URI"); - System.err.println(" fileFormat - Set to 'TEXT' for reading text files or 'SEQ' for sequence files."); - System.err.println(" sourceDir - read files from this HDFS dir using HdfsSpout."); - System.err.println(" archiveDir - after a file in sourceDir is read completely, it is moved to this HDFS location."); - System.err.println(" badDir - files that cannot be read properly will be moved to this HDFS location."); - System.err.println(" spoutCount - Num of spout instances."); - System.err.println(); - System.exit(-1); - } + public ConstBolt() { + } - // 1 - parse cmd line args - String topologyName = args[0]; - String hdfsUri = args[1]; - String fileFormat = args[2]; - String sourceDir = args[3]; - String archiveDir = args[4]; - String badDir = args[5]; - int spoutNum = Integer.parseInt(args[6]); - - // 2 - create and configure spout and bolt - ConstBolt bolt = new ConstBolt(); - - HdfsSpout spout = new HdfsSpout().withOutputFields(TextFileReader.defaultFields) - .setReaderType(fileFormat) - .setHdfsUri(hdfsUri) - .setSourceDir(sourceDir) - .setArchiveDir(archiveDir) - .setBadFilesDir(badDir); - - // 3 - Create and configure topology - Config conf = new Config(); - conf.setNumWorkers(1); - conf.setNumAckers(1); - conf.setMaxTaskParallelism(1); - conf.setDebug(true); - conf.registerMetricsConsumer(LoggingMetricsConsumer.class); - - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout(SPOUT_ID, spout, spoutNum); - builder.setBolt(BOLT_ID, bolt, 1).shuffleGrouping(SPOUT_ID); - - // 4 - submit topology, wait for a few min and terminate it - Map<String, Object> clusterConf = Utils.readStormConfig(); - StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); - Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient(); - - // 5 - Print metrics every 30 sec, kill topology after 20 min - for (int i = 0; i < 40; i++) { - Thread.sleep(30 * 1000); - printMetrics(client, topologyName); - } - kill(client, topologyName); - } // main - - private static void kill(Nimbus.Iface client, String topologyName) throws Exception { - KillOptions opts = new KillOptions(); - opts.set_wait_secs(0); - client.killTopologyWithOpts(topologyName, opts); - } - - static void printMetrics(Nimbus.Iface client, String name) throws Exception { - ClusterSummary summary = client.getClusterInfo(); - String id = null; - for (TopologySummary ts: summary.get_topologies()) { - if (name.equals(ts.get_name())) { - id = ts.get_id(); - } - } - if (id == null) { - throw new Exception("Could not find a topology named "+name); - } - TopologyInfo info = client.getTopologyInfo(id); - int uptime = info.get_uptime_secs(); - long acked = 0; - long failed = 0; - double weightedAvgTotal = 0.0; - for (ExecutorSummary exec: info.get_executors()) { - if ("spout".equals(exec.get_component_id())) { - SpoutStats stats = exec.get_stats().get_specific().get_spout(); - Map<String, Long> failedMap = stats.get_failed().get(":all-time"); - Map<String, Long> ackedMap = stats.get_acked().get(":all-time"); - Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time"); - for (String key: ackedMap.keySet()) { - if (failedMap != null) { - Long tmp = failedMap.get(key); - if (tmp != null) { - failed += tmp; + @Override + public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple tuple) { + log.info("Received tuple : {}", tuple.getValue(0)); + count++; + if (count == 3) { + collector.fail(tuple); + } else { + collector.ack(tuple); } - } - long ackVal = ackedMap.get(key); - double latVal = avgLatMap.get(key) * ackVal; - acked += ackVal; - weightedAvgTotal += latVal; } - } - } - double avgLatency = weightedAvgTotal/acked; - System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed)); - } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(FIELDS)); + } + } // class } http://git-wip-us.apache.org/repos/asf/storm/blob/e8ffac45/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java ---------------------------------------------------------------------- diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java index 1553971..29793f1 100644 --- a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java +++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java @@ -15,18 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.hdfs.trident; -import org.apache.storm.Config; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Fields; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.spout.IBatchSpout; +package org.apache.storm.hdfs.trident; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.spout.IBatchSpout; +import org.apache.storm.tuple.Fields; public class FixedBatchSpout implements IBatchSpout { @@ -34,6 +34,8 @@ public class FixedBatchSpout implements IBatchSpout { List<Object>[] outputs; int maxBatchSize; HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>(); + int index = 0; + boolean cycle = false; public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) { this.fields = fields; @@ -41,9 +43,6 @@ public class FixedBatchSpout implements IBatchSpout { this.maxBatchSize = maxBatchSize; } - int index = 0; - boolean cycle = false; - public void setCycle(boolean cycle) { this.cycle = cycle; } @@ -56,20 +55,20 @@ public class FixedBatchSpout implements IBatchSpout { @Override public void emitBatch(long batchId, TridentCollector collector) { List<List<Object>> batch = this.batches.get(batchId); - if(batch == null){ + if (batch == null) { batch = new ArrayList<List<Object>>(); - if(index>=outputs.length && cycle) { + if (index >= outputs.length && cycle) { index = 0; } - for(int i=0; i < maxBatchSize; index++, i++) { - if(index == outputs.length){ - index=0; + for (int i = 0; i < maxBatchSize; index++, i++) { + if (index == outputs.length) { + index = 0; } batch.add(outputs[index]); } this.batches.put(batchId, batch); } - for(List<Object> list : batch){ + for (List<Object> list : batch) { collector.emit(list); } } http://git-wip-us.apache.org/repos/asf/storm/blob/e8ffac45/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java index af76c00..e8bf490 100644 --- a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java +++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java @@ -15,12 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.hdfs.trident; import java.io.FileInputStream; import java.io.InputStream; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; @@ -40,10 +40,12 @@ import org.yaml.snakeyaml.Yaml; public class TridentFileTopology { - public static StormTopology buildTopology(String hdfsUrl){ + public static StormTopology buildTopology(String hdfsUrl) { FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence", "key"), 1000, new Values("the cow jumped over the moon", 1l), - new Values("the man went to the store and bought some candy", 2l), new Values("four score and seven years ago", 3l), - new Values("how many apples can you eat", 4l), new Values("to be or not to be the person", 5l)); + new Values("the man went to the store and bought some candy", 2l), + new Values("four score and seven years ago", 3l), + new Values("how many apples can you eat", 4l), + new Values("to be or not to be the person", 5l)); spout.setCycle(true); TridentTopology topology = new TridentTopology(); @@ -52,26 +54,26 @@ public class TridentFileTopology { Fields hdfsFields = new Fields("sentence", "key"); FileNameFormat fileNameFormat = new DefaultFileNameFormat() - .withPath("/tmp/trident") - .withPrefix("trident") - .withExtension(".txt"); + .withPath("/tmp/trident") + .withPrefix("trident") + .withExtension(".txt"); RecordFormat recordFormat = new DelimitedRecordFormat() - .withFields(hdfsFields); + .withFields(hdfsFields); FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB); HdfsState.Options options = new HdfsState.HdfsFileOptions() - .withFileNameFormat(fileNameFormat) - .withRecordFormat(recordFormat) - .withRotationPolicy(rotationPolicy) - .withFsUrl(hdfsUrl) - .withConfigKey("hdfs.config"); + .withFileNameFormat(fileNameFormat) + .withRecordFormat(recordFormat) + .withRotationPolicy(rotationPolicy) + .withFsUrl(hdfsUrl) + .withConfigKey("hdfs.config"); StateFactory factory = new HdfsStateFactory().withOptions(options); TridentState state = stream - .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields()); + .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields()); return topology.build(); } http://git-wip-us.apache.org/repos/asf/storm/blob/e8ffac45/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java index 525770b..15612bd 100644 --- a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java +++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java @@ -15,12 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.hdfs.trident; import java.io.FileInputStream; import java.io.InputStream; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; @@ -40,10 +40,12 @@ import org.yaml.snakeyaml.Yaml; public class TridentSequenceTopology { - public static StormTopology buildTopology(String hdfsUrl){ + public static StormTopology buildTopology(String hdfsUrl) { FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence", "key"), 1000, new Values("the cow jumped over the moon", 1l), - new Values("the man went to the store and bought some candy", 2l), new Values("four score and seven years ago", 3l), - new Values("how many apples can you eat", 4l), new Values("to be or not to be the person", 5l)); + new Values("the man went to the store and bought some candy", 2l), + new Values("four score and seven years ago", 3l), + new Values("how many apples can you eat", 4l), + new Values("to be or not to be the person", 5l)); spout.setCycle(true); TridentTopology topology = new TridentTopology(); @@ -52,23 +54,23 @@ public class TridentSequenceTopology { Fields hdfsFields = new Fields("sentence", "key"); FileNameFormat fileNameFormat = new DefaultFileNameFormat() - .withPath("/tmp/trident") - .withPrefix("trident") - .withExtension(".seq"); + .withPath("/tmp/trident") + .withPrefix("trident") + .withExtension(".seq"); FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB); HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions() - .withFileNameFormat(fileNameFormat) - .withSequenceFormat(new DefaultSequenceFormat("key", "sentence")) - .withRotationPolicy(rotationPolicy) - .withFsUrl(hdfsUrl) - .withConfigKey("hdfs.config") - .addRotationAction(new MoveFileAction().toDestination("/tmp/dest2/")); + .withFileNameFormat(fileNameFormat) + .withSequenceFormat(new DefaultSequenceFormat("key", "sentence")) + .withRotationPolicy(rotationPolicy) + .withFsUrl(hdfsUrl) + .withConfigKey("hdfs.config") + .addRotationAction(new MoveFileAction().toDestination("/tmp/dest2/")); StateFactory factory = new HdfsStateFactory().withOptions(seqOpts); TridentState state = stream - .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields()); + .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields()); return topology.build(); }
