Repository: flink Updated Branches: refs/heads/master 1adefee2e -> bd2fce6e1
[FLINK-4174] Add accessor for current watermark in Evictor Context Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd2fce6e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd2fce6e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd2fce6e Branch: refs/heads/master Commit: bd2fce6e1aa8d1f568092488946e5cda44c0cb81 Parents: 1adefee Author: Aljoscha Krettek <[email protected]> Authored: Tue Nov 15 14:52:16 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Nov 15 14:52:16 2016 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/windowing/evictors/Evictor.java | 7 +++++-- .../runtime/operators/windowing/EvictingWindowOperator.java | 9 +++++++-- .../runtime/operators/windowing/WindowOperator.java | 2 +- .../operators/windowing/EvictingWindowOperatorTest.java | 2 -- 4 files changed, 13 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bd2fce6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java index 02e93eb..7557766 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java @@ -70,8 +70,7 @@ public interface Evictor<T, W extends Window> extends Serializable { interface EvictorContext { /** - * Returns the current processing time, as returned by - * the {@link ProcessingTimeService#getCurrentProcessingTime}. + * Returns the current processing time. */ long getCurrentProcessingTime(); @@ -86,6 +85,10 @@ public interface Evictor<T, W extends Window> extends Serializable { */ MetricGroup getMetricGroup(); + /** + * Returns the current watermark time. + */ + long getCurrentWatermark(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bd2fce6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index 3be3f5a..150f46e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -63,7 +63,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window private final Evictor<? super IN, ? super W> evictor; - protected transient EvictorContext evictorContext = new EvictorContext(null, null); + private transient EvictorContext evictorContext; private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor; @@ -348,7 +348,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window @Override public long getCurrentProcessingTime() { - return EvictingWindowOperator.this.getProcessingTimeService().getCurrentProcessingTime(); + return internalTimerService.currentProcessingTime(); + } + + @Override + public long getCurrentWatermark() { + return internalTimerService.currentWatermark(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bd2fce6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 6ff3999..0ead14a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -139,7 +139,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> // State that needs to be checkpointed // ------------------------------------------------------------------------ - private transient InternalTimerService<W> internalTimerService; + protected transient InternalTimerService<W> internalTimerService; /** * Creates a new {@code WindowOperator} based on the given policies and user functions. http://git-wip-us.apache.org/repos/asf/flink/blob/bd2fce6e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java index 46495b0..8da1d7c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java @@ -59,8 +59,6 @@ import java.util.concurrent.atomic.AtomicInteger; public class EvictingWindowOperatorTest { - // For counting if close() is called the correct number of times on the SumReducer - /** * Tests CountEvictor evictAfter behavior * @throws Exception
