Repository: flink
Updated Branches:
  refs/heads/master 1adefee2e -> bd2fce6e1


[FLINK-4174] Add accessor for current watermark in Evictor Context


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd2fce6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd2fce6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd2fce6e

Branch: refs/heads/master
Commit: bd2fce6e1aa8d1f568092488946e5cda44c0cb81
Parents: 1adefee
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Nov 15 14:52:16 2016 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Tue Nov 15 14:52:16 2016 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/windowing/evictors/Evictor.java     | 7 +++++--
 .../runtime/operators/windowing/EvictingWindowOperator.java | 9 +++++++--
 .../runtime/operators/windowing/WindowOperator.java         | 2 +-
 .../operators/windowing/EvictingWindowOperatorTest.java     | 2 --
 4 files changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bd2fce6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
index 02e93eb..7557766 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
@@ -70,8 +70,7 @@ public interface Evictor<T, W extends Window> extends 
Serializable {
        interface EvictorContext {
 
                /**
-                * Returns the current processing time, as returned by
-                * the {@link ProcessingTimeService#getCurrentProcessingTime}.
+                * Returns the current processing time.
                 */
                long getCurrentProcessingTime();
 
@@ -86,6 +85,10 @@ public interface Evictor<T, W extends Window> extends 
Serializable {
                 */
                MetricGroup getMetricGroup();
 
+               /**
+                * Returns the current watermark time.
+                */
+               long getCurrentWatermark();
        }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd2fce6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 3be3f5a..150f46e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -63,7 +63,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
 
        private final Evictor<? super IN, ? super W> evictor;
 
-       protected transient EvictorContext evictorContext = new 
EvictorContext(null, null);
+       private transient EvictorContext evictorContext;
 
        private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> 
windowStateDescriptor;
 
@@ -348,7 +348,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
 
                @Override
                public long getCurrentProcessingTime() {
-                       return 
EvictingWindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
+                       return internalTimerService.currentProcessingTime();
+               }
+
+               @Override
+               public long getCurrentWatermark() {
+                       return internalTimerService.currentWatermark();
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bd2fce6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 6ff3999..0ead14a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -139,7 +139,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        // State that needs to be checkpointed
        // 
------------------------------------------------------------------------
 
-       private transient InternalTimerService<W> internalTimerService;
+       protected transient InternalTimerService<W> internalTimerService;
 
        /**
         * Creates a new {@code WindowOperator} based on the given policies and 
user functions.

http://git-wip-us.apache.org/repos/asf/flink/blob/bd2fce6e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 46495b0..8da1d7c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -59,8 +59,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class EvictingWindowOperatorTest {
 
-       // For counting if close() is called the correct number of times on the 
SumReducer
-
        /**
         * Tests CountEvictor evictAfter behavior
         * @throws Exception

Reply via email to