STORM-676 Refactoring of WindowConfig APIs
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8c263c7c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8c263c7c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8c263c7c Branch: refs/heads/1.x-branch Commit: 8c263c7c08a3ebd11a1d5df4996b7e12422cd721 Parents: b08d7ea Author: Satish Duggana <[email protected]> Authored: Wed Mar 23 22:35:08 2016 +0530 Committer: Satish Duggana <[email protected]> Committed: Sun Mar 27 10:47:14 2016 +0530 ---------------------------------------------------------------------- .../windowing/AbstractTridentWindowManager.java | 4 +- .../windowing/config/SlidingCountWindow.java | 7 ++- .../windowing/config/SlidingDurationWindow.java | 6 +- .../windowing/config/TumblingCountWindow.java | 8 ++- .../config/TumblingDurationWindow.java | 6 +- .../trident/windowing/config/WindowConfig.java | 4 +- .../strategy/WindowStrategyFactory.java | 60 -------------------- .../storm/trident/TridentWindowingTest.java | 25 ++++---- 8 files changed, 33 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java index aac18d3..f93527a 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java @@ -25,7 +25,6 @@ import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.trident.windowing.config.WindowConfig; import org.apache.storm.trident.windowing.strategy.WindowStrategy; -import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory; import org.apache.storm.windowing.EvictionPolicy; import org.apache.storm.windowing.TriggerPolicy; import org.apache.storm.windowing.WindowLifecycleListener; @@ -34,7 +33,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Queue; import java.util.Set; @@ -71,7 +69,7 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM windowManager = new WindowManager<>(new TridentWindowLifeCycleListener()); - WindowStrategy<T> windowStrategy = WindowStrategyFactory.create(windowConfig); + WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy(); EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy(); windowManager.setEvictionPolicy(evictionPolicy); triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy); http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java index a0dd13c..2e2d388 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java @@ -18,6 +18,9 @@ */ package org.apache.storm.trident.windowing.config; +import org.apache.storm.trident.windowing.strategy.SlidingCountWindowStrategy; +import org.apache.storm.trident.windowing.strategy.WindowStrategy; + /** * Represents configuration of sliding window based on count of events. Window of length {@code windowLength} slides * at every count of given {@code slideLength} @@ -30,8 +33,8 @@ public final class SlidingCountWindow extends BaseWindowConfig { } @Override - public Type getWindowType() { - return Type.SLIDING_COUNT; + public <T> WindowStrategy<T> getWindowStrategy() { + return new SlidingCountWindowStrategy<>(this); } public static SlidingCountWindow of(int windowCount, int slidingCount) { http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java index f2fe291..befd4e3 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java @@ -19,6 +19,8 @@ package org.apache.storm.trident.windowing.config; import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy; +import org.apache.storm.trident.windowing.strategy.WindowStrategy; /** * Represents configuration of sliding window based on duration. Window duration of {@code windowLength} slides @@ -32,8 +34,8 @@ public final class SlidingDurationWindow extends BaseWindowConfig { } @Override - public Type getWindowType() { - return Type.SLIDING_DURATION; + public <T> WindowStrategy<T> getWindowStrategy() { + return new SlidingDurationWindowStrategy<>(this); } public static SlidingDurationWindow of(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingDuration) { http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java index a5f3528..1988850 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java @@ -18,6 +18,10 @@ */ package org.apache.storm.trident.windowing.config; +import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy; +import org.apache.storm.trident.windowing.strategy.TumblingCountWindowStrategy; +import org.apache.storm.trident.windowing.strategy.WindowStrategy; + /** * Represents tumbling count window configuration. Window tumbles at each given {@code windowLength} count of events. */ @@ -28,8 +32,8 @@ public final class TumblingCountWindow extends BaseWindowConfig { } @Override - public Type getWindowType() { - return Type.TUMBLING_COUNT; + public <T> WindowStrategy<T> getWindowStrategy() { + return new TumblingCountWindowStrategy<>(this); } public static TumblingCountWindow of(int windowLength) { http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java index 8beb68d..3881a74 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java @@ -19,6 +19,8 @@ package org.apache.storm.trident.windowing.config; import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.trident.windowing.strategy.TumblingDurationWindowStrategy; +import org.apache.storm.trident.windowing.strategy.WindowStrategy; /** * Represents tumbling duration window configuration. Window tumbles every given {@code windowLength} duration. @@ -30,8 +32,8 @@ public final class TumblingDurationWindow extends BaseWindowConfig { } @Override - public Type getWindowType() { - return Type.TUMBLING_DURATION; + public <T> WindowStrategy<T> getWindowStrategy() { + return new TumblingDurationWindowStrategy<>(this); } public static TumblingDurationWindow of(BaseWindowedBolt.Duration windowLength) { http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java index 49347e7..7cb78ee 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java @@ -18,6 +18,8 @@ */ package org.apache.storm.trident.windowing.config; +import org.apache.storm.trident.windowing.strategy.WindowStrategy; + import java.io.Serializable; /** @@ -42,7 +44,7 @@ public interface WindowConfig extends Serializable { * * @return */ - public Type getWindowType(); + public <T> WindowStrategy<T> getWindowStrategy(); public void validate(); http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java deleted file mode 100644 index d8a3918..0000000 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.storm.trident.windowing.strategy; - -import org.apache.storm.trident.windowing.config.WindowConfig; - -/** - * - */ -public final class WindowStrategyFactory { - - private WindowStrategyFactory() { - } - - /** - * Creates a {@code WindowStrategy} instance based on the given {@code windowConfig}. - * - * @param windowConfig - * @return - */ - public static <T> WindowStrategy<T> create(WindowConfig windowConfig) { - WindowStrategy<T> windowStrategy = null; - WindowConfig.Type windowType = windowConfig.getWindowType(); - switch(windowType) { - case SLIDING_COUNT: - windowStrategy = new SlidingCountWindowStrategy<>(windowConfig); - break; - case TUMBLING_COUNT: - windowStrategy = new TumblingCountWindowStrategy<>(windowConfig); - break; - case SLIDING_DURATION: - windowStrategy = new SlidingDurationWindowStrategy<>(windowConfig); - break; - case TUMBLING_DURATION: - windowStrategy = new TumblingDurationWindowStrategy<>(windowConfig); - break; - default: - throw new IllegalArgumentException("Given WindowConfig of type "+windowType+" is not supported"); - } - - return windowStrategy; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java index 03f298d..4b82b89 100644 --- a/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java +++ b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java @@ -24,15 +24,8 @@ import org.apache.storm.trident.windowing.config.SlidingCountWindow; import org.apache.storm.trident.windowing.config.SlidingDurationWindow; import org.apache.storm.trident.windowing.config.TumblingCountWindow; import org.apache.storm.trident.windowing.config.TumblingDurationWindow; -import org.apache.storm.trident.windowing.strategy.SlidingCountWindowStrategy; -import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy; -import org.apache.storm.trident.windowing.strategy.TumblingCountWindowStrategy; -import org.apache.storm.trident.windowing.strategy.TumblingDurationWindowStrategy; -import org.apache.storm.trident.windowing.strategy.WindowStrategy; -import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory; -import org.junit.After; +import org.apache.storm.trident.windowing.strategy.*; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import java.util.concurrent.TimeUnit; @@ -45,19 +38,21 @@ public class TridentWindowingTest { @Test public void testWindowStrategyInstances() throws Exception { - WindowStrategy<Object> tumblingCountStrategy = WindowStrategyFactory.create(TumblingCountWindow.of(10)); + WindowStrategy<Object> tumblingCountStrategy = TumblingCountWindow.of(10).getWindowStrategy(); Assert.assertTrue(tumblingCountStrategy instanceof TumblingCountWindowStrategy); - WindowStrategy<Object> slidingCountStrategy = WindowStrategyFactory.create(SlidingCountWindow.of(100, 10)); + WindowStrategy<Object> slidingCountStrategy = SlidingCountWindow.of(100, 10).getWindowStrategy(); Assert.assertTrue(slidingCountStrategy instanceof SlidingCountWindowStrategy); - WindowStrategy<Object> tumblingDurationStrategy = WindowStrategyFactory.create( - TumblingDurationWindow.of(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))); + WindowStrategy<Object> tumblingDurationStrategy = TumblingDurationWindow.of( + new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)) + .getWindowStrategy(); Assert.assertTrue(tumblingDurationStrategy instanceof TumblingDurationWindowStrategy); - WindowStrategy<Object> slidingDurationStrategy = WindowStrategyFactory.create( - SlidingDurationWindow.of(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), - new BaseWindowedBolt.Duration(2, TimeUnit.SECONDS))); + WindowStrategy<Object> slidingDurationStrategy = SlidingDurationWindow.of( + new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), + new BaseWindowedBolt.Duration(2, TimeUnit.SECONDS)) + .getWindowStrategy(); Assert.assertTrue(slidingDurationStrategy instanceof SlidingDurationWindowStrategy); }
