Fixing stylecheck problems with storm-hdfs-examples

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e8ffac45
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e8ffac45
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e8ffac45

Branch: refs/heads/master
Commit: e8ffac458baf6cc7a90dd0ced78cc01f0ffb4229
Parents: f7f3524
Author: Kishor Patil <[email protected]>
Authored: Mon Apr 23 00:44:09 2018 -0400
Committer: Kishor Patil <[email protected]>
Committed: Mon Apr 23 02:32:42 2018 -0400

----------------------------------------------------------------------
 examples/storm-hdfs-examples/pom.xml            |   2 +-
 .../storm/hdfs/bolt/HdfsFileTopology.java       |  54 ++--
 .../storm/hdfs/bolt/SequenceFileTopology.java   |  58 ++--
 .../storm/hdfs/spout/HdfsSpoutTopology.java     | 307 +++++++++----------
 .../storm/hdfs/trident/FixedBatchSpout.java     |  29 +-
 .../storm/hdfs/trident/TridentFileTopology.java |  30 +-
 .../hdfs/trident/TridentSequenceTopology.java   |  30 +-
 7 files changed, 248 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e8ffac45/examples/storm-hdfs-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/pom.xml 
b/examples/storm-hdfs-examples/pom.xml
index c2271fe..30b07e6 100644
--- a/examples/storm-hdfs-examples/pom.xml
+++ b/examples/storm-hdfs-examples/pom.xml
@@ -89,7 +89,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>224</maxAllowedViolations>
+                    <maxAllowedViolations>29</maxAllowedViolations>
                 </configuration>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/e8ffac45/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
 
b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
index f8d69ca..e3599d5 100644
--- 
a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
+++ 
b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hdfs.bolt;
 
 import java.io.FileInputStream;
@@ -23,7 +18,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
@@ -65,12 +59,12 @@ public class HdfsFileTopology {
         FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, 
TimedRotationPolicy.TimeUnit.MINUTES);
 
         FileNameFormat fileNameFormat = new DefaultFileNameFormat()
-                .withPath("/tmp/foo/")
-                .withExtension(".txt");
+            .withPath("/tmp/foo/")
+            .withExtension(".txt");
 
         // use "|" instead of "," for field delimiter
         RecordFormat format = new DelimitedRecordFormat()
-                .withFieldDelimiter("|");
+            .withFieldDelimiter("|");
 
         Yaml yaml = new Yaml();
         InputStream in = new FileInputStream(args[1]);
@@ -79,20 +73,20 @@ public class HdfsFileTopology {
         config.put("hdfs.config", yamlConf);
 
         HdfsBolt bolt = new HdfsBolt()
-                .withConfigKey("hdfs.config")
-                .withFsUrl(args[0])
-                .withFileNameFormat(fileNameFormat)
-                .withRecordFormat(format)
-                .withRotationPolicy(rotationPolicy)
-                .withSyncPolicy(syncPolicy)
-                .addRotationAction(new 
MoveFileAction().toDestination("/tmp/dest2/"));
+            .withConfigKey("hdfs.config")
+            .withFsUrl(args[0])
+            .withFileNameFormat(fileNameFormat)
+            .withRecordFormat(format)
+            .withRotationPolicy(rotationPolicy)
+            .withSyncPolicy(syncPolicy)
+            .addRotationAction(new 
MoveFileAction().toDestination("/tmp/dest2/"));
 
         TopologyBuilder builder = new TopologyBuilder();
 
         builder.setSpout(SENTENCE_SPOUT_ID, spout, 1);
         // SentenceSpout --> MyBolt
         builder.setBolt(BOLT_ID, bolt, 4)
-                .shuffleGrouping(SENTENCE_SPOUT_ID);
+               .shuffleGrouping(SENTENCE_SPOUT_ID);
         String topoName = TOPOLOGY_NAME;
         if (args.length == 3) {
             topoName = args[2];
@@ -114,11 +108,11 @@ public class HdfsFileTopology {
         private ConcurrentHashMap<UUID, Values> pending;
         private SpoutOutputCollector collector;
         private String[] sentences = {
-                "my dog has fleas",
-                "i like cold beverages",
-                "the dog ate my homework",
-                "don't have a cow man",
-                "i don't think i like fleas"
+            "my dog has fleas",
+            "i like cold beverages",
+            "the dog ate my homework",
+            "don't have a cow man",
+            "i don't think i like fleas"
         };
         private int index = 0;
         private int count = 0;
@@ -145,7 +139,7 @@ public class HdfsFileTopology {
             }
             count++;
             total++;
-            if(count > 20000){
+            if (count > 20000) {
                 count = 0;
                 System.out.println("Pending count: " + this.pending.size() + 
", total: " + this.total);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/e8ffac45/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
 
b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
index 2707af5..8ddd045 100644
--- 
a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
+++ 
b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hdfs.bolt;
 
 import java.io.FileInputStream;
@@ -23,7 +18,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
@@ -66,8 +60,8 @@ public class SequenceFileTopology {
         FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, 
Units.MB);
 
         FileNameFormat fileNameFormat = new DefaultFileNameFormat()
-                .withPath("/tmp/source/")
-                .withExtension(".seq");
+            .withPath("/tmp/source/")
+            .withExtension(".seq");
 
         // create sequence format instance.
         DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", 
"sentence");
@@ -79,22 +73,22 @@ public class SequenceFileTopology {
         config.put("hdfs.config", yamlConf);
 
         SequenceFileBolt bolt = new SequenceFileBolt()
-                .withFsUrl(args[0])
-                .withConfigKey("hdfs.config")
-                .withFileNameFormat(fileNameFormat)
-                .withSequenceFormat(format)
-                .withRotationPolicy(rotationPolicy)
-                .withSyncPolicy(syncPolicy)
-                .withCompressionType(SequenceFile.CompressionType.RECORD)
-                .withCompressionCodec("deflate")
-                .addRotationAction(new 
MoveFileAction().toDestination("/tmp/dest/"));
+            .withFsUrl(args[0])
+            .withConfigKey("hdfs.config")
+            .withFileNameFormat(fileNameFormat)
+            .withSequenceFormat(format)
+            .withRotationPolicy(rotationPolicy)
+            .withSyncPolicy(syncPolicy)
+            .withCompressionType(SequenceFile.CompressionType.RECORD)
+            .withCompressionCodec("deflate")
+            .addRotationAction(new 
MoveFileAction().toDestination("/tmp/dest/"));
 
         TopologyBuilder builder = new TopologyBuilder();
 
         builder.setSpout(SENTENCE_SPOUT_ID, spout, 1);
         // SentenceSpout --> MyBolt
         builder.setBolt(BOLT_ID, bolt, 4)
-                .shuffleGrouping(SENTENCE_SPOUT_ID);
+               .shuffleGrouping(SENTENCE_SPOUT_ID);
 
         String topoName = TOPOLOGY_NAME;
         if (args.length == 3) {
@@ -120,11 +114,11 @@ public class SequenceFileTopology {
         private ConcurrentHashMap<UUID, Values> pending;
         private SpoutOutputCollector collector;
         private String[] sentences = {
-                "my dog has fleas",
-                "i like cold beverages",
-                "the dog ate my homework",
-                "don't have a cow man",
-                "i don't think i like fleas"
+            "my dog has fleas",
+            "i like cold beverages",
+            "the dog ate my homework",
+            "don't have a cow man",
+            "i don't think i like fleas"
         };
         private int index = 0;
         private int count = 0;
@@ -151,7 +145,7 @@ public class SequenceFileTopology {
             }
             count++;
             total++;
-            if(count > 20000){
+            if (count > 20000) {
                 count = 0;
                 System.out.println("Pending count: " + this.pending.size() + 
", total: " + this.total);
             }
@@ -159,7 +153,7 @@ public class SequenceFileTopology {
         }
 
         public void ack(Object msgId) {
-//            System.out.println("ACK");
+            //            System.out.println("ACK");
             this.pending.remove(msgId);
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/e8ffac45/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java
 
b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java
index 943ca60..e3282d8 100644
--- 
a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java
+++ 
b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java
@@ -1,192 +1,187 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hdfs.spout;
 
+import java.util.Map;
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ExecutorSummary;
 import org.apache.storm.generated.KillOptions;
 import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.SpoutStats;
 import org.apache.storm.generated.TopologyInfo;
 import org.apache.storm.generated.TopologySummary;
-import org.apache.storm.generated.ExecutorSummary;
-import org.apache.storm.generated.SpoutStats;
-import org.apache.storm.generated.ClusterSummary;
 import org.apache.storm.metric.LoggingMetricsConsumer;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.NimbusClient;
 import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 
 public class HdfsSpoutTopology {
 
-  public static final String SPOUT_ID = "hdfsspout";
-  public static final String BOLT_ID = "constbolt";
-
+    public static final String SPOUT_ID = "hdfsspout";
+    public static final String BOLT_ID = "constbolt";
+
+    /**
+     * Copies text file content from sourceDir to destinationDir. Moves source 
files into sourceDir after its done consuming
+     */
+    public static void main(String[] args) throws Exception {
+        // 0 - validate args
+        if (args.length < 7) {
+            System.err.println("Please check command line arguments.");
+            System.err.println("Usage :");
+            System.err.println(
+                HdfsSpoutTopology.class.toString() + " topologyName hdfsUri 
fileFormat sourceDir sourceArchiveDir badDir destinationDir.");
+            System.err.println(" topologyName - topology name.");
+            System.err.println(" hdfsUri - hdfs name node URI");
+            System.err.println(" fileFormat -  Set to 'TEXT' for reading text 
files or 'SEQ' for sequence files.");
+            System.err.println(" sourceDir  - read files from this HDFS dir 
using HdfsSpout.");
+            System.err.println(" archiveDir - after a file in sourceDir is 
read completely, it is moved to this HDFS location.");
+            System.err.println(" badDir - files that cannot be read properly 
will be moved to this HDFS location.");
+            System.err.println(" spoutCount - Num of spout instances.");
+            System.err.println();
+            System.exit(-1);
+        }
 
-  public static class ConstBolt extends BaseRichBolt {
-    private static final long serialVersionUID = -5313598399155365865L;
-    public static final String FIELDS = "message";
-    private OutputCollector collector;
-    private static final Logger log = LoggerFactory.getLogger(ConstBolt.class);
-    int count =0;
+        // 1 - parse cmd line args
+        String topologyName = args[0];
+        String hdfsUri = args[1];
+        String fileFormat = args[2];
+        String sourceDir = args[3];
+        String archiveDir = args[4];
+        String badDir = args[5];
+        int spoutNum = Integer.parseInt(args[6]);
+
+        // 2 - create and configure spout and bolt
+        ConstBolt bolt = new ConstBolt();
+
+        HdfsSpout spout = new 
HdfsSpout().withOutputFields(TextFileReader.defaultFields)
+                                         .setReaderType(fileFormat)
+                                         .setHdfsUri(hdfsUri)
+                                         .setSourceDir(sourceDir)
+                                         .setArchiveDir(archiveDir)
+                                         .setBadFilesDir(badDir);
+
+        // 3 - Create and configure topology
+        Config conf = new Config();
+        conf.setNumWorkers(1);
+        conf.setNumAckers(1);
+        conf.setMaxTaskParallelism(1);
+        conf.setDebug(true);
+        conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(SPOUT_ID, spout, spoutNum);
+        builder.setBolt(BOLT_ID, bolt, 1).shuffleGrouping(SPOUT_ID);
+
+        // 4 - submit topology, wait for a few min and terminate it
+        Map<String, Object> clusterConf = Utils.readStormConfig();
+        StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, 
builder.createTopology());
+        Nimbus.Iface client = 
NimbusClient.getConfiguredClient(clusterConf).getClient();
+
+        // 5 - Print metrics every 30 sec, kill topology after 20 min
+        for (int i = 0; i < 40; i++) {
+            Thread.sleep(30 * 1000);
+            printMetrics(client, topologyName);
+        }
+        kill(client, topologyName);
+    } // main
 
-    public ConstBolt() {
+    private static void kill(Nimbus.Iface client, String topologyName) throws 
Exception {
+        KillOptions opts = new KillOptions();
+        opts.set_wait_secs(0);
+        client.killTopologyWithOpts(topologyName, opts);
     }
 
-    @Override
-    public void prepare(Map<String, Object> conf, TopologyContext context, 
OutputCollector collector) {
-      this.collector = collector;
+    static void printMetrics(Nimbus.Iface client, String name) throws 
Exception {
+        ClusterSummary summary = client.getClusterInfo();
+        String id = null;
+        for (TopologySummary ts : summary.get_topologies()) {
+            if (name.equals(ts.get_name())) {
+                id = ts.get_id();
+            }
+        }
+        if (id == null) {
+            throw new Exception("Could not find a topology named " + name);
+        }
+        TopologyInfo info = client.getTopologyInfo(id);
+        int uptime = info.get_uptime_secs();
+        long acked = 0;
+        long failed = 0;
+        double weightedAvgTotal = 0.0;
+        for (ExecutorSummary exec : info.get_executors()) {
+            if ("spout".equals(exec.get_component_id())) {
+                SpoutStats stats = exec.get_stats().get_specific().get_spout();
+                Map<String, Long> failedMap = 
stats.get_failed().get(":all-time");
+                Map<String, Long> ackedMap = 
stats.get_acked().get(":all-time");
+                Map<String, Double> avgLatMap = 
stats.get_complete_ms_avg().get(":all-time");
+                for (String key : ackedMap.keySet()) {
+                    if (failedMap != null) {
+                        Long tmp = failedMap.get(key);
+                        if (tmp != null) {
+                            failed += tmp;
+                        }
+                    }
+                    long ackVal = ackedMap.get(key);
+                    double latVal = avgLatMap.get(key) * ackVal;
+                    acked += ackVal;
+                    weightedAvgTotal += latVal;
+                }
+            }
+        }
+        double avgLatency = weightedAvgTotal / acked;
+        System.out.println("uptime: " + uptime + " acked: " + acked + " 
avgLatency: " + avgLatency + " acked/sec: " +
+                           (((double) acked) / uptime + " failed: " + failed));
     }
 
-    @Override
-    public void execute(Tuple tuple) {
-      log.info("Received tuple : {}", tuple.getValue(0));
-      count++;
-      if(count==3) {
-        collector.fail(tuple);
-      }
-      else {
-        collector.ack(tuple);
-      }
-    }
+    public static class ConstBolt extends BaseRichBolt {
+        public static final String FIELDS = "message";
+        private static final long serialVersionUID = -5313598399155365865L;
+        private static final Logger log = 
LoggerFactory.getLogger(ConstBolt.class);
+        int count = 0;
+        private OutputCollector collector;
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields(FIELDS));
-    }
-  } // class
-
-  /** Copies text file content from sourceDir to destinationDir. Moves source 
files into sourceDir after its done consuming */
-  public static void main(String[] args) throws Exception {
-    // 0 - validate args
-    if (args.length < 7) {
-      System.err.println("Please check command line arguments.");
-      System.err.println("Usage :");
-      System.err.println(HdfsSpoutTopology.class.toString() + " topologyName 
hdfsUri fileFormat sourceDir sourceArchiveDir badDir destinationDir.");
-      System.err.println(" topologyName - topology name.");
-      System.err.println(" hdfsUri - hdfs name node URI");
-      System.err.println(" fileFormat -  Set to 'TEXT' for reading text files 
or 'SEQ' for sequence files.");
-      System.err.println(" sourceDir  - read files from this HDFS dir using 
HdfsSpout.");
-      System.err.println(" archiveDir - after a file in sourceDir is read 
completely, it is moved to this HDFS location.");
-      System.err.println(" badDir - files that cannot be read properly will be 
moved to this HDFS location.");
-      System.err.println(" spoutCount - Num of spout instances.");
-      System.err.println();
-      System.exit(-1);
-    }
+        public ConstBolt() {
+        }
 
-    // 1 - parse cmd line args
-    String topologyName = args[0];
-    String hdfsUri = args[1];
-    String fileFormat = args[2];
-    String sourceDir = args[3];
-    String archiveDir = args[4];
-    String badDir = args[5];
-    int spoutNum = Integer.parseInt(args[6]);
-
-    // 2 - create and configure spout and bolt
-    ConstBolt bolt = new ConstBolt();
-
-    HdfsSpout spout =  new 
HdfsSpout().withOutputFields(TextFileReader.defaultFields)
-                                      .setReaderType(fileFormat)
-                                      .setHdfsUri(hdfsUri)
-                                      .setSourceDir(sourceDir)
-                                      .setArchiveDir(archiveDir)
-                                      .setBadFilesDir(badDir);
-
-    // 3 - Create and configure topology
-    Config conf = new Config();
-    conf.setNumWorkers(1);
-    conf.setNumAckers(1);
-    conf.setMaxTaskParallelism(1);
-    conf.setDebug(true);
-    conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
-
-    TopologyBuilder builder = new TopologyBuilder();
-    builder.setSpout(SPOUT_ID, spout, spoutNum);
-    builder.setBolt(BOLT_ID, bolt, 1).shuffleGrouping(SPOUT_ID);
-
-    // 4 - submit topology, wait for a few min and terminate it
-    Map<String, Object> clusterConf = Utils.readStormConfig();
-    StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, 
builder.createTopology());
-    Nimbus.Iface client = 
NimbusClient.getConfiguredClient(clusterConf).getClient();
-
-    // 5 - Print metrics every 30 sec, kill topology after 20 min
-    for (int i = 0; i < 40; i++) {
-      Thread.sleep(30 * 1000);
-      printMetrics(client, topologyName);
-    }
-    kill(client, topologyName);
-  } // main
-
-  private static void kill(Nimbus.Iface client, String topologyName) throws 
Exception {
-    KillOptions opts = new KillOptions();
-    opts.set_wait_secs(0);
-    client.killTopologyWithOpts(topologyName, opts);
-  }
-
-  static void printMetrics(Nimbus.Iface client, String name) throws Exception {
-    ClusterSummary summary = client.getClusterInfo();
-    String id = null;
-    for (TopologySummary ts: summary.get_topologies()) {
-      if (name.equals(ts.get_name())) {
-        id = ts.get_id();
-      }
-    }
-    if (id == null) {
-      throw new Exception("Could not find a topology named "+name);
-    }
-    TopologyInfo info = client.getTopologyInfo(id);
-    int uptime = info.get_uptime_secs();
-    long acked = 0;
-    long failed = 0;
-    double weightedAvgTotal = 0.0;
-    for (ExecutorSummary exec: info.get_executors()) {
-      if ("spout".equals(exec.get_component_id())) {
-        SpoutStats stats = exec.get_stats().get_specific().get_spout();
-        Map<String, Long> failedMap = stats.get_failed().get(":all-time");
-        Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
-        Map<String, Double> avgLatMap = 
stats.get_complete_ms_avg().get(":all-time");
-        for (String key: ackedMap.keySet()) {
-          if (failedMap != null) {
-            Long tmp = failedMap.get(key);
-            if (tmp != null) {
-              failed += tmp;
+        @Override
+        public void prepare(Map<String, Object> conf, TopologyContext context, 
OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple tuple) {
+            log.info("Received tuple : {}", tuple.getValue(0));
+            count++;
+            if (count == 3) {
+                collector.fail(tuple);
+            } else {
+                collector.ack(tuple);
             }
-          }
-          long ackVal = ackedMap.get(key);
-          double latVal = avgLatMap.get(key) * ackVal;
-          acked += ackVal;
-          weightedAvgTotal += latVal;
         }
-      }
-    }
-    double avgLatency = weightedAvgTotal/acked;
-    System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: 
"+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed));
-  }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields(FIELDS));
+        }
+    } // class
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e8ffac45/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java
 
b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java
index 1553971..29793f1 100644
--- 
a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java
+++ 
b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java
@@ -15,18 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.hdfs.trident;
 
-import org.apache.storm.Config;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
+package org.apache.storm.hdfs.trident;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.tuple.Fields;
 
 public class FixedBatchSpout implements IBatchSpout {
 
@@ -34,6 +34,8 @@ public class FixedBatchSpout implements IBatchSpout {
     List<Object>[] outputs;
     int maxBatchSize;
     HashMap<Long, List<List<Object>>> batches = new HashMap<Long, 
List<List<Object>>>();
+    int index = 0;
+    boolean cycle = false;
 
     public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... 
outputs) {
         this.fields = fields;
@@ -41,9 +43,6 @@ public class FixedBatchSpout implements IBatchSpout {
         this.maxBatchSize = maxBatchSize;
     }
 
-    int index = 0;
-    boolean cycle = false;
-
     public void setCycle(boolean cycle) {
         this.cycle = cycle;
     }
@@ -56,20 +55,20 @@ public class FixedBatchSpout implements IBatchSpout {
     @Override
     public void emitBatch(long batchId, TridentCollector collector) {
         List<List<Object>> batch = this.batches.get(batchId);
-        if(batch == null){
+        if (batch == null) {
             batch = new ArrayList<List<Object>>();
-            if(index>=outputs.length && cycle) {
+            if (index >= outputs.length && cycle) {
                 index = 0;
             }
-            for(int i=0; i < maxBatchSize; index++, i++) {
-                if(index == outputs.length){
-                    index=0;
+            for (int i = 0; i < maxBatchSize; index++, i++) {
+                if (index == outputs.length) {
+                    index = 0;
                 }
                 batch.add(outputs[index]);
             }
             this.batches.put(batchId, batch);
         }
-        for(List<Object> list : batch){
+        for (List<Object> list : batch) {
             collector.emit(list);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/e8ffac45/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
 
b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
index af76c00..e8bf490 100644
--- 
a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
+++ 
b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
@@ -15,12 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.hdfs.trident;
 
 import java.io.FileInputStream;
 import java.io.InputStream;
 import java.util.Map;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
@@ -40,10 +40,12 @@ import org.yaml.snakeyaml.Yaml;
 
 public class TridentFileTopology {
 
-    public static StormTopology buildTopology(String hdfsUrl){
+    public static StormTopology buildTopology(String hdfsUrl) {
         FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence", 
"key"), 1000, new Values("the cow jumped over the moon", 1l),
-                new Values("the man went to the store and bought some candy", 
2l), new Values("four score and seven years ago", 3l),
-                new Values("how many apples can you eat", 4l), new Values("to 
be or not to be the person", 5l));
+                                                    new Values("the man went 
to the store and bought some candy", 2l),
+                                                    new Values("four score and 
seven years ago", 3l),
+                                                    new Values("how many 
apples can you eat", 4l),
+                                                    new Values("to be or not 
to be the person", 5l));
         spout.setCycle(true);
 
         TridentTopology topology = new TridentTopology();
@@ -52,26 +54,26 @@ public class TridentFileTopology {
         Fields hdfsFields = new Fields("sentence", "key");
 
         FileNameFormat fileNameFormat = new DefaultFileNameFormat()
-                .withPath("/tmp/trident")
-                .withPrefix("trident")
-                .withExtension(".txt");
+            .withPath("/tmp/trident")
+            .withPrefix("trident")
+            .withExtension(".txt");
 
         RecordFormat recordFormat = new DelimitedRecordFormat()
-                .withFields(hdfsFields);
+            .withFields(hdfsFields);
 
         FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, 
FileSizeRotationPolicy.Units.MB);
 
         HdfsState.Options options = new HdfsState.HdfsFileOptions()
-                .withFileNameFormat(fileNameFormat)
-                .withRecordFormat(recordFormat)
-                .withRotationPolicy(rotationPolicy)
-                .withFsUrl(hdfsUrl)
-                .withConfigKey("hdfs.config");
+            .withFileNameFormat(fileNameFormat)
+            .withRecordFormat(recordFormat)
+            .withRotationPolicy(rotationPolicy)
+            .withFsUrl(hdfsUrl)
+            .withConfigKey("hdfs.config");
 
         StateFactory factory = new HdfsStateFactory().withOptions(options);
 
         TridentState state = stream
-                .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new 
Fields());
+            .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new 
Fields());
 
         return topology.build();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/e8ffac45/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java
 
b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java
index 525770b..15612bd 100644
--- 
a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java
+++ 
b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java
@@ -15,12 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.hdfs.trident;
 
 import java.io.FileInputStream;
 import java.io.InputStream;
 import java.util.Map;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
@@ -40,10 +40,12 @@ import org.yaml.snakeyaml.Yaml;
 
 public class TridentSequenceTopology {
 
-    public static StormTopology buildTopology(String hdfsUrl){
+    public static StormTopology buildTopology(String hdfsUrl) {
         FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence", 
"key"), 1000, new Values("the cow jumped over the moon", 1l),
-                new Values("the man went to the store and bought some candy", 
2l), new Values("four score and seven years ago", 3l),
-                new Values("how many apples can you eat", 4l), new Values("to 
be or not to be the person", 5l));
+                                                    new Values("the man went 
to the store and bought some candy", 2l),
+                                                    new Values("four score and 
seven years ago", 3l),
+                                                    new Values("how many 
apples can you eat", 4l),
+                                                    new Values("to be or not 
to be the person", 5l));
         spout.setCycle(true);
 
         TridentTopology topology = new TridentTopology();
@@ -52,23 +54,23 @@ public class TridentSequenceTopology {
         Fields hdfsFields = new Fields("sentence", "key");
 
         FileNameFormat fileNameFormat = new DefaultFileNameFormat()
-                .withPath("/tmp/trident")
-                .withPrefix("trident")
-                .withExtension(".seq");
+            .withPath("/tmp/trident")
+            .withPrefix("trident")
+            .withExtension(".seq");
 
         FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, 
FileSizeRotationPolicy.Units.MB);
 
         HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
-                .withFileNameFormat(fileNameFormat)
-                .withSequenceFormat(new DefaultSequenceFormat("key", 
"sentence"))
-                .withRotationPolicy(rotationPolicy)
-                .withFsUrl(hdfsUrl)
-                .withConfigKey("hdfs.config")
-                .addRotationAction(new 
MoveFileAction().toDestination("/tmp/dest2/"));
+            .withFileNameFormat(fileNameFormat)
+            .withSequenceFormat(new DefaultSequenceFormat("key", "sentence"))
+            .withRotationPolicy(rotationPolicy)
+            .withFsUrl(hdfsUrl)
+            .withConfigKey("hdfs.config")
+            .addRotationAction(new 
MoveFileAction().toDestination("/tmp/dest2/"));
         StateFactory factory = new HdfsStateFactory().withOptions(seqOpts);
 
         TridentState state = stream
-                .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new 
Fields());
+            .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new 
Fields());
 
         return topology.build();
     }

Reply via email to