Repository: flink
Updated Branches:
  refs/heads/master e33243729 -> 45762162f


[FLINK-4862] fix Timer register in ContinuousEventTimeTrigger

This closes #2671


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45762162
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45762162
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45762162

Branch: refs/heads/master
Commit: 45762162fe7c23fd921db1e0f826b2906bfe1dcd
Parents: e332437
Author: manuzhang <[email protected]>
Authored: Thu Oct 20 15:06:01 2016 +0800
Committer: Maximilian Michels <[email protected]>
Committed: Mon Oct 24 15:49:18 2016 +0200

----------------------------------------------------------------------
 .../triggers/ContinuousEventTimeTrigger.java    | 14 ++--
 .../operators/windowing/WindowOperatorTest.java | 71 ++++++++++++++++++++
 2 files changed, 81 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/45762162/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 46080ff..fa5bb2f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -88,9 +88,11 @@ public class ContinuousEventTimeTrigger<W extends Window> 
extends Trigger<Object
        @Override
        public void clear(W window, TriggerContext ctx) throws Exception {
                ReducingState<Long> fireTimestamp = 
ctx.getPartitionedState(stateDesc);
-               long timestamp = fireTimestamp.get();
-               ctx.deleteEventTimeTimer(timestamp);
-               fireTimestamp.clear();
+               Long timestamp = fireTimestamp.get();
+               if (timestamp != null) {
+                       ctx.deleteEventTimeTimer(timestamp);
+                       fireTimestamp.clear();
+               }
        }
 
        @Override
@@ -99,8 +101,12 @@ public class ContinuousEventTimeTrigger<W extends Window> 
extends Trigger<Object
        }
 
        @Override
-       public TriggerResult onMerge(W window, OnMergeContext ctx) {
+       public TriggerResult onMerge(W window, OnMergeContext ctx) throws 
Exception {
                ctx.mergePartitionedState(stateDesc);
+               Long nextFireTimestamp = 
ctx.getPartitionedState(stateDesc).get();
+               if (nextFireTimestamp != null) {
+                       ctx.registerEventTimeTimer(nextFireTimestamp);
+               }
                return TriggerResult.CONTINUE;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/45762162/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index b8a764e..9e50aaa 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -569,6 +569,77 @@ public class WindowOperatorTest extends TestLogger {
                testHarness.close();
        }
 
+       /**
+        * This tests whether merging works correctly with the 
ContinuousEventTimeTrigger.
+        * @throws Exception
+        */
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testSessionWindowsWithContinuousEventTimeTrigger() throws 
Exception {
+               closeCalled.set(0);
+
+               final int SESSION_SIZE = 3;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new 
ListStateDescriptor<>("window-contents",
+                       inputType.createSerializer(new ExecutionConfig()));
+
+               WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> 
operator = new WindowOperator<>(
+                       
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+                       new TimeWindow.Serializer(),
+                       new TupleKeySelector(),
+                       BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                       stateDesc,
+                       new InternalIterableWindowFunction<>(new 
SessionWindowFunction()),
+                       ContinuousEventTimeTrigger.of(Time.seconds(2)),
+                       0);
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               testHarness.open();
+
+               // add elements out-of-order and first trigger time is 2000
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 1500));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 0));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 3), 2500));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 2), 1000));
+
+               // triggers emit and next trigger time is 4000
+               testHarness.processWatermark(new Watermark(2500));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-1", 
1500L, 4500L), 4499));
+               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 
0L, 5500L), 5499));
+               expectedOutput.add(new Watermark(2500));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 5), 4000));
+               testHarness.processWatermark(new Watermark(3000));
+               expectedOutput.add(new Watermark(3000));
+
+               // do a snapshot, close and restore again
+               StreamStateHandle snapshot = testHarness.snapshotLegacy(0L, 0L);
+               testHarness.close();
+               testHarness.setup();
+               testHarness.restore(snapshot);
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 2), 4000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 4), 3500));
+               // triggers emit and next trigger time is 6000
+               testHarness.processWatermark(new Watermark(4000));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 
1500L, 7000L), 6999));
+               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-15", 
0L, 7000L), 6999));
+               expectedOutput.add(new Watermark(4000));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
+
+               testHarness.close();
+       }
+
        @Test
        public void testMergeAndEvictor() throws Exception {
                // verify that merging WindowAssigner and Evictor cannot be 
used together

Reply via email to