STORM-676 Addressed review comments from Arun
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3a96f20f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3a96f20f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3a96f20f Branch: refs/heads/1.x-branch Commit: 3a96f20f09b3eaccc59d333a581c3b0c7d345ccc Parents: dd02bcf Author: Satish Duggana <[email protected]> Authored: Wed Mar 23 11:35:37 2016 +0530 Committer: Satish Duggana <[email protected]> Committed: Sun Mar 27 10:46:49 2016 +0530 ---------------------------------------------------------------------- examples/storm-starter/pom.xml | 31 ------------ .../TridentHBaseWindowingStoreTopology.java | 49 ++++-------------- .../TridentWindowingInmemoryStoreTopology.java | 53 ++++---------------- .../windowing/HBaseWindowsStoreFactory.java | 3 ++ .../jvm/org/apache/storm/trident/Stream.java | 12 ++--- .../windowing/AbstractTridentWindowManager.java | 1 - .../windowing/InMemoryTridentWindowManager.java | 6 --- .../StoreBasedTridentWindowManager.java | 6 --- 8 files changed, 28 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/examples/storm-starter/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml index e702a5d..6053595 100644 --- a/examples/storm-starter/pom.xml +++ b/examples/storm-starter/pom.xml @@ -35,7 +35,6 @@ <!-- see comment below... This fixes an annoyance with intellij --> <provided.scope>provided</provided.scope> <hbase.version>0.98.4-hadoop2</hbase.version> - <hbase.version>1.1.2</hbase.version> </properties> <profiles> @@ -175,36 +174,6 @@ <artifactId>storm-redis</artifactId> <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-server</artifactId> - <version>${hbase.version}</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-client</artifactId> - <version>${hbase.version}</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - </exclusions> - </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java index 0ebaa1f..ba18f7c 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java @@ -25,13 +25,10 @@ import org.apache.storm.generated.StormTopology; import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentTopology; -import org.apache.storm.trident.operation.BaseFunction; -import org.apache.storm.trident.operation.Function; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.operation.TridentOperationContext; -import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.operation.Consumer; import org.apache.storm.trident.testing.CountAsAggregator; import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.testing.Split; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.trident.windowing.WindowsStoreFactory; import org.apache.storm.trident.windowing.config.TumblingCountWindow; @@ -42,9 +39,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; -import java.util.Map; /** + * Sample application of trident windowing which uses {@link HBaseWindowsStoreFactory}'s store for storing tuples in window. * */ public class TridentHBaseWindowingStoreTopology { @@ -61,46 +58,20 @@ public class TridentHBaseWindowingStoreTopology { Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")) .window(TumblingCountWindow.of(1000), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) -// .tumblingTimeWindow(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) - .each(new Fields("count"), new Debug()) - .each(new Fields("count"), new Echo(), new Fields("ct")); + .peek(new Consumer() { + @Override + public void accept(TridentTuple input) { + LOG.info("Received tuple: [{}]", input); + } + }); return topology.build(); } - public static class Split extends BaseFunction { - @Override - public void execute(TridentTuple tuple, TridentCollector collector) { - String sentence = tuple.getString(0); - for (String word : sentence.split(" ")) { - collector.emit(new Values(word)); - } - } - } - - static class Echo implements Function { - - @Override - public void execute(TridentTuple tuple, TridentCollector collector) { - LOG.info("##########Echo.execute: " + tuple); - collector.emit(tuple.getValues()); - } - - @Override - public void prepare(Map conf, TridentOperationContext context) { - - } - - @Override - public void cleanup() { - - } - } - public static void main(String[] args) throws Exception { Config conf = new Config(); conf.setMaxSpoutPending(20); - conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 2); + conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 100); // window-state table should already be created with cf:tuples column HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8")); http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java index a2455a0..5aec01d 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java @@ -25,21 +25,14 @@ import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.base.BaseWindowedBolt; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentTopology; -import org.apache.storm.trident.operation.BaseFunction; -import org.apache.storm.trident.operation.Function; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.operation.TridentOperationContext; -import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.operation.Consumer; import org.apache.storm.trident.testing.CountAsAggregator; import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.testing.Split; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory; import org.apache.storm.trident.windowing.WindowsStoreFactory; -import org.apache.storm.trident.windowing.config.SlidingCountWindow; -import org.apache.storm.trident.windowing.config.SlidingDurationWindow; -import org.apache.storm.trident.windowing.config.TumblingCountWindow; -import org.apache.storm.trident.windowing.config.TumblingDurationWindow; -import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.trident.windowing.config.*; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; @@ -48,7 +41,6 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -68,43 +60,16 @@ public class TridentWindowingInmemoryStoreTopology { Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")) .window(windowConfig, windowStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) -// .aggregate(new CountAsAggregator(), new Fields("count")) - .each(new Fields("count"), new Debug()) - .each(new Fields("count"), new Echo(), new Fields("ct")) - .each(new Fields("ct"), new Debug()); + .peek(new Consumer() { + @Override + public void accept(TridentTuple input) { + LOG.info("Received tuple: [{}]", input); + } + }); return topology.build(); } - public static class Split extends BaseFunction { - @Override - public void execute(TridentTuple tuple, TridentCollector collector) { - String sentence = tuple.getString(0); - for (String word : sentence.split(" ")) { - collector.emit(new Values(word)); - } - } - } - - static class Echo implements Function { - - @Override - public void execute(TridentTuple tuple, TridentCollector collector) { - LOG.info("##########Echo.execute: " + tuple); - collector.emit(tuple.getValues()); - } - - @Override - public void prepare(Map conf, TridentOperationContext context) { - - } - - @Override - public void cleanup() { - - } - } - public static void main(String[] args) throws Exception { Config conf = new Config(); WindowsStoreFactory mapState = new InMemoryWindowsStoreFactory(); http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java index 56fad58..a49bc87 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java @@ -25,6 +25,9 @@ import org.apache.storm.trident.windowing.WindowsStoreFactory; import java.util.Map; +/** + * + */ public class HBaseWindowsStoreFactory implements WindowsStoreFactory { private final Map<String, Object> config; private final String tableName; http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/storm-core/src/jvm/org/apache/storm/trident/Stream.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java index 47b087a..444b42a 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java +++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> + * * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -629,7 +629,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { * and slides the window with {@code slideCount}. * * @param windowCount represents tuples count of a window - * @param slideCount represents sliding count window + * @param slideCount the number of tuples after which the window slides * @param windowStoreFactory intermediary tuple store for storing windowing tuples * @param inputFields projected fields for aggregator * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. @@ -663,7 +663,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { * and completes a window at {@code windowDuration} * * @param windowDuration represents window duration configuration - * @param slideDuration represents sliding duration configuration + * @param slidingInterval the time duration after which the window slides * @param windowStoreFactory intermediary tuple store for storing windowing tuples * @param inputFields projected fields for aggregator * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. @@ -671,9 +671,9 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { * * @return */ - public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slideDuration, + public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) { - return window(SlidingDurationWindow.of(windowDuration, slideDuration), windowStoreFactory, inputFields, aggregator, functionFields); + return window(SlidingDurationWindow.of(windowDuration, slidingInterval), windowStoreFactory, inputFields, aggregator, functionFields); } /** http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java index fd7a957..aac18d3 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java @@ -55,7 +55,6 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM protected final String windowTaskId; protected final WindowsStore windowStore; - protected final Set<String> activeBatches = new HashSet<>(); protected final Queue<TriggerResult> pendingTriggers = new ConcurrentLinkedQueue<>(); protected final AtomicInteger triggerId = new AtomicInteger(); private final String windowTriggerCountId; http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java index e47cc9a..69eb39e 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java @@ -56,12 +56,6 @@ public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<T } public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) { - // check if they are already added then ignore these tuples. This batch is replayed. - if (activeBatches.contains(getBatchTxnId(batchId))) { - LOG.info("Ignoring already added tuples with batch: [{}]", batchId); - return; - } - LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId); for (TridentTuple tridentTuple : tuples) { windowManager.add(tridentTuple); http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java index 58b24a2..87c1a0f 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java @@ -129,12 +129,6 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager } public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) { - // check if they are already added then ignore these tuples. This batch is replayed. - if (activeBatches.contains(getBatchTxnId(batchId))) { - LOG.info("Ignoring already added tuples with batch: [{}]", batchId); - return; - } - LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId); List<WindowsStore.Entry> entries = new ArrayList<>(); for (int i = 0; i < tuples.size(); i++) {
