STORM-676 Addressed review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b08d7eaf Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b08d7eaf Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b08d7eaf Branch: refs/heads/1.x-branch Commit: b08d7eaf7099e4da74010501a189818ec11b00bc Parents: 3a96f20 Author: Satish Duggana <[email protected]> Authored: Wed Mar 23 19:03:55 2016 +0530 Committer: Satish Duggana <[email protected]> Committed: Sun Mar 27 10:47:03 2016 +0530 ---------------------------------------------------------------------- .../trident/windowing/HBaseWindowsStore.java | 2 +- .../windowing/HBaseWindowsStoreFactory.java | 1 + .../jvm/org/apache/storm/trident/Stream.java | 22 +++++++++++--------- .../windowing/WindowTridentProcessor.java | 2 +- 4 files changed, 15 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b08d7eaf/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java index ff3fbf9..b300ed6 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java @@ -55,7 +55,7 @@ public class HBaseWindowsStore implements WindowsStore { public static final String UTF_8 = "utf-8"; private final ThreadLocal<HTable> threadLocalHtable; - private Queue<HTable> htables = new ConcurrentLinkedQueue<>(); + private final Queue<HTable> htables = new ConcurrentLinkedQueue<>(); private final byte[] family; private final byte[] qualifier; http://git-wip-us.apache.org/repos/asf/storm/blob/b08d7eaf/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java index a49bc87..a47d5fb 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java @@ -26,6 +26,7 @@ import org.apache.storm.trident.windowing.WindowsStoreFactory; import java.util.Map; /** + * Factory to create {@link HBaseWindowsStore} instances. * */ public class HBaseWindowsStoreFactory implements WindowsStoreFactory { http://git-wip-us.apache.org/repos/asf/storm/blob/b08d7eaf/storm-core/src/jvm/org/apache/storm/trident/Stream.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java index 444b42a..23ac34a 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java +++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java @@ -611,13 +611,13 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { /** * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. * - * @param windowCount represents no of tuples in the window + * @param windowCount represents number of tuples in the window * @param windowStoreFactory intermediary tuple store for storing windowing tuples * @param inputFields projected fields for aggregator * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. * @param functionFields fields of values to emit with aggregation. * - * @return + * @return the new stream with this operation. */ public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) { @@ -626,7 +626,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { /** * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples - * and slides the window with {@code slideCount}. + * and slides the window after {@code slideCount}. * * @param windowCount represents tuples count of a window * @param slideCount the number of tuples after which the window slides @@ -635,7 +635,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. * @param functionFields fields of values to emit with aggregation. * - * @return + * @return the new stream with this operation. */ public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) { @@ -643,7 +643,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { } /** - * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration} + * Returns a stream of tuples which are aggregated results of a window that tumbles at duration of {@code windowDuration} * * @param windowDuration represents tumbling window duration configuration * @param windowStoreFactory intermediary tuple store for storing windowing tuples @@ -651,7 +651,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. * @param functionFields fields of values to emit with aggregation. * - * @return + * @return the new stream with this operation. */ public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) { @@ -659,7 +659,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { } /** - * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slideDuration} + * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slidingInterval} * and completes a window at {@code windowDuration} * * @param windowDuration represents window duration configuration @@ -669,7 +669,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. * @param functionFields fields of values to emit with aggregation. * - * @return + * @return the new stream with this operation. */ public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) { @@ -683,7 +683,8 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { * @param inputFields input fields * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. * @param functionFields fields of values to emit with aggregation. - * @return + * + * @return the new stream with this operation. */ public Stream window(WindowConfig windowConfig, Fields inputFields, Aggregator aggregator, Fields functionFields) { // this store is used only for storing triggered aggregated results but not tuples as storeTuplesInStore is set @@ -700,7 +701,8 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { * @param inputFields input fields * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. * @param functionFields fields of values to emit with aggregation. - * @return + * + * @return the new stream with this operation. */ public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) { http://git-wip-us.apache.org/repos/asf/storm/blob/b08d7eaf/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java index 8898b13..5125e41 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java @@ -53,7 +53,7 @@ public class WindowTridentProcessor implements TridentProcessor { public static final String TRIGGER_COUNT_PREFIX = "tc" + WindowsStore.KEY_SEPARATOR; public static final String TRIGGER_FIELD_NAME = "_task_info"; - public static final long DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT = 100l; + public static final long DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT = 100L; private final String windowId; private final Fields inputFields;
