STORM-676 Addressed review comments from Arun

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3a96f20f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3a96f20f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3a96f20f

Branch: refs/heads/1.x-branch
Commit: 3a96f20f09b3eaccc59d333a581c3b0c7d345ccc
Parents: dd02bcf
Author: Satish Duggana <[email protected]>
Authored: Wed Mar 23 11:35:37 2016 +0530
Committer: Satish Duggana <[email protected]>
Committed: Sun Mar 27 10:46:49 2016 +0530

----------------------------------------------------------------------
 examples/storm-starter/pom.xml                  | 31 ------------
 .../TridentHBaseWindowingStoreTopology.java     | 49 ++++--------------
 .../TridentWindowingInmemoryStoreTopology.java  | 53 ++++----------------
 .../windowing/HBaseWindowsStoreFactory.java     |  3 ++
 .../jvm/org/apache/storm/trident/Stream.java    | 12 ++---
 .../windowing/AbstractTridentWindowManager.java |  1 -
 .../windowing/InMemoryTridentWindowManager.java |  6 ---
 .../StoreBasedTridentWindowManager.java         |  6 ---
 8 files changed, 28 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index e702a5d..6053595 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -35,7 +35,6 @@
     <!-- see comment below... This fixes an annoyance with intellij -->
     <provided.scope>provided</provided.scope>
     <hbase.version>0.98.4-hadoop2</hbase.version>
-    <hbase.version>1.1.2</hbase.version>
   </properties>
 
   <profiles>
@@ -175,36 +174,6 @@
       <artifactId>storm-redis</artifactId>
       <version>${project.version}</version>
     </dependency>
-      <dependency>
-          <groupId>org.apache.hbase</groupId>
-          <artifactId>hbase-server</artifactId>
-          <version>${hbase.version}</version>
-          <exclusions>
-              <exclusion>
-                  <groupId>org.slf4j</groupId>
-                  <artifactId>slf4j-log4j12</artifactId>
-              </exclusion>
-              <exclusion>
-                  <groupId>org.apache.zookeeper</groupId>
-                  <artifactId>zookeeper</artifactId>
-              </exclusion>
-          </exclusions>
-      </dependency>
-      <dependency>
-          <groupId>org.apache.hbase</groupId>
-          <artifactId>hbase-client</artifactId>
-          <version>${hbase.version}</version>
-          <exclusions>
-              <exclusion>
-                  <groupId>org.slf4j</groupId>
-                  <artifactId>slf4j-log4j12</artifactId>
-              </exclusion>
-              <exclusion>
-                  <groupId>org.apache.zookeeper</groupId>
-                  <artifactId>zookeeper</artifactId>
-              </exclusion>
-          </exclusions>
-      </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
index 0ebaa1f..ba18f7c 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
@@ -25,13 +25,10 @@ import org.apache.storm.generated.StormTopology;
 import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.Function;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.operation.Consumer;
 import org.apache.storm.trident.testing.CountAsAggregator;
 import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.Split;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.trident.windowing.WindowsStoreFactory;
 import org.apache.storm.trident.windowing.config.TumblingCountWindow;
@@ -42,9 +39,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
-import java.util.Map;
 
 /**
+ * Sample application of trident windowing which uses {@link 
HBaseWindowsStoreFactory}'s store for storing tuples in window.
  *
  */
 public class TridentHBaseWindowingStoreTopology {
@@ -61,46 +58,20 @@ public class TridentHBaseWindowingStoreTopology {
         Stream stream = topology.newStream("spout1", 
spout).parallelismHint(16).each(new Fields("sentence"),
                 new Split(), new Fields("word"))
                 .window(TumblingCountWindow.of(1000), windowsStore, new 
Fields("word"), new CountAsAggregator(), new Fields("count"))
-//                .tumblingTimeWindow(new BaseWindowedBolt.Duration(3, 
TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), 
new Fields("count"))
-                .each(new Fields("count"), new Debug())
-                .each(new Fields("count"), new Echo(), new Fields("ct"));
+                .peek(new Consumer() {
+                    @Override
+                    public void accept(TridentTuple input) {
+                        LOG.info("Received tuple: [{}]", input);
+                    }
+                });
 
         return topology.build();
     }
 
-    public static class Split extends BaseFunction {
-        @Override
-        public void execute(TridentTuple tuple, TridentCollector collector) {
-            String sentence = tuple.getString(0);
-            for (String word : sentence.split(" ")) {
-                collector.emit(new Values(word));
-            }
-        }
-    }
-
-    static class Echo implements Function {
-
-        @Override
-        public void execute(TridentTuple tuple, TridentCollector collector) {
-            LOG.info("##########Echo.execute: " + tuple);
-            collector.emit(tuple.getValues());
-        }
-
-        @Override
-        public void prepare(Map conf, TridentOperationContext context) {
-
-        }
-
-        @Override
-        public void cleanup() {
-
-        }
-    }
-
     public static void main(String[] args) throws Exception {
         Config conf = new Config();
         conf.setMaxSpoutPending(20);
-        conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 2);
+        conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 100);
 
         // window-state table should already be created with cf:tuples column
         HBaseWindowsStoreFactory windowStoreFactory = new 
HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", 
"cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));

http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
index a2455a0..5aec01d 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
@@ -25,21 +25,14 @@ import org.apache.storm.generated.StormTopology;
 import org.apache.storm.topology.base.BaseWindowedBolt;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.Function;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.operation.Consumer;
 import org.apache.storm.trident.testing.CountAsAggregator;
 import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.Split;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
 import org.apache.storm.trident.windowing.WindowsStoreFactory;
-import org.apache.storm.trident.windowing.config.SlidingCountWindow;
-import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
-import org.apache.storm.trident.windowing.config.TumblingCountWindow;
-import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
-import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.trident.windowing.config.*;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
@@ -48,7 +41,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -68,43 +60,16 @@ public class TridentWindowingInmemoryStoreTopology {
         Stream stream = topology.newStream("spout1", 
spout).parallelismHint(16).each(new Fields("sentence"),
                 new Split(), new Fields("word"))
                 .window(windowConfig, windowStore, new Fields("word"), new 
CountAsAggregator(), new Fields("count"))
-//                .aggregate(new CountAsAggregator(), new Fields("count"))
-                .each(new Fields("count"), new Debug())
-                .each(new Fields("count"), new Echo(), new Fields("ct"))
-                .each(new Fields("ct"), new Debug());
+                .peek(new Consumer() {
+                    @Override
+                    public void accept(TridentTuple input) {
+                        LOG.info("Received tuple: [{}]", input);
+                    }
+                });
 
         return topology.build();
     }
 
-    public static class Split extends BaseFunction {
-        @Override
-        public void execute(TridentTuple tuple, TridentCollector collector) {
-            String sentence = tuple.getString(0);
-            for (String word : sentence.split(" ")) {
-                collector.emit(new Values(word));
-            }
-        }
-    }
-
-    static class Echo implements Function {
-
-        @Override
-        public void execute(TridentTuple tuple, TridentCollector collector) {
-            LOG.info("##########Echo.execute: " + tuple);
-            collector.emit(tuple.getValues());
-        }
-
-        @Override
-        public void prepare(Map conf, TridentOperationContext context) {
-
-        }
-
-        @Override
-        public void cleanup() {
-
-        }
-    }
-
     public static void main(String[] args) throws Exception {
         Config conf = new Config();
         WindowsStoreFactory mapState = new InMemoryWindowsStoreFactory();

http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
index 56fad58..a49bc87 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
@@ -25,6 +25,9 @@ import org.apache.storm.trident.windowing.WindowsStoreFactory;
 
 import java.util.Map;
 
+/**
+ *
+ */
 public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
     private final Map<String, Object> config;
     private final String tableName;

http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java 
b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index 47b087a..444b42a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -629,7 +629,7 @@ public class Stream implements IAggregatableStream, 
ResourceDeclarer<Stream> {
      * and slides the window with {@code slideCount}.
      *
      * @param windowCount represents tuples count of a window
-     * @param slideCount represents sliding count window
+     * @param slideCount the number of tuples after which the window slides
      * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
      * @param inputFields projected fields for aggregator
      * @param aggregator aggregator to run on the window of tuples to compute 
the result and emit to the stream.
@@ -663,7 +663,7 @@ public class Stream implements IAggregatableStream, 
ResourceDeclarer<Stream> {
      * and completes a window at {@code windowDuration}
      *
      * @param windowDuration represents window duration configuration
-     * @param slideDuration represents sliding duration  configuration
+     * @param slidingInterval the time duration after which the window slides
      * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
      * @param inputFields projected fields for aggregator
      * @param aggregator aggregator to run on the window of tuples to compute 
the result and emit to the stream.
@@ -671,9 +671,9 @@ public class Stream implements IAggregatableStream, 
ResourceDeclarer<Stream> {
      *
      * @return
      */
-    public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, 
BaseWindowedBolt.Duration slideDuration,
+    public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, 
BaseWindowedBolt.Duration slidingInterval,
                                     WindowsStoreFactory windowStoreFactory, 
Fields inputFields, Aggregator aggregator, Fields functionFields) {
-        return window(SlidingDurationWindow.of(windowDuration, slideDuration), 
windowStoreFactory, inputFields, aggregator, functionFields);
+        return window(SlidingDurationWindow.of(windowDuration, 
slidingInterval), windowStoreFactory, inputFields, aggregator, functionFields);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
index fd7a957..aac18d3 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -55,7 +55,6 @@ public abstract class AbstractTridentWindowManager<T> 
implements ITridentWindowM
     protected final String windowTaskId;
     protected final WindowsStore windowStore;
 
-    protected final Set<String> activeBatches = new HashSet<>();
     protected final Queue<TriggerResult> pendingTriggers = new 
ConcurrentLinkedQueue<>();
     protected final AtomicInteger triggerId = new AtomicInteger();
     private final String windowTriggerCountId;

http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
index e47cc9a..69eb39e 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
@@ -56,12 +56,6 @@ public class InMemoryTridentWindowManager extends 
AbstractTridentWindowManager<T
     }
 
     public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
-        // check if they are already added then ignore these tuples. This 
batch is replayed.
-        if (activeBatches.contains(getBatchTxnId(batchId))) {
-            LOG.info("Ignoring already added tuples with batch: [{}]", 
batchId);
-            return;
-        }
-
         LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
         for (TridentTuple tridentTuple : tuples) {
             windowManager.add(tridentTuple);

http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
index 58b24a2..87c1a0f 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
@@ -129,12 +129,6 @@ public class StoreBasedTridentWindowManager extends 
AbstractTridentWindowManager
     }
 
     public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
-        // check if they are already added then ignore these tuples. This 
batch is replayed.
-        if (activeBatches.contains(getBatchTxnId(batchId))) {
-            LOG.info("Ignoring already added tuples with batch: [{}]", 
batchId);
-            return;
-        }
-
         LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
         List<WindowsStore.Entry> entries = new ArrayList<>();
         for (int i = 0; i < tuples.size(); i++) {

Reply via email to