Repository: flink
Updated Branches:
  refs/heads/release-1.1 5731672e5 -> 05a5f460b


[FLINK-4862] fix Timer register in ContinuousEventTimeTrigger

Backported to release-1.1.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05a5f460
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05a5f460
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05a5f460

Branch: refs/heads/release-1.1
Commit: 05a5f460b33828cc8a1e6a45d37b555facc7133f
Parents: 5731672
Author: manuzhang <[email protected]>
Authored: Thu Oct 20 15:06:01 2016 +0800
Committer: Maximilian Michels <[email protected]>
Committed: Mon Oct 24 15:54:16 2016 +0200

----------------------------------------------------------------------
 .../triggers/ContinuousEventTimeTrigger.java    | 14 ++--
 .../operators/windowing/WindowOperatorTest.java | 76 ++++++++++++++++++++
 2 files changed, 86 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/05a5f460/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 02613f6..cb8cdf5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -89,9 +89,11 @@ public class ContinuousEventTimeTrigger<W extends Window> 
extends Trigger<Object
        @Override
        public void clear(W window, TriggerContext ctx) throws Exception {
                ReducingState<Long> fireTimestamp = 
ctx.getPartitionedState(stateDesc);
-               long timestamp = fireTimestamp.get();
-               ctx.deleteEventTimeTimer(timestamp);
-               fireTimestamp.clear();
+               Long timestamp = fireTimestamp.get();
+               if (timestamp != null) {
+                       ctx.deleteEventTimeTimer(timestamp);
+                       fireTimestamp.clear();
+               }
        }
 
        @Override
@@ -100,8 +102,12 @@ public class ContinuousEventTimeTrigger<W extends Window> 
extends Trigger<Object
        }
 
        @Override
-       public TriggerResult onMerge(W window, OnMergeContext ctx) {
+       public TriggerResult onMerge(W window, OnMergeContext ctx) throws 
Exception {
                ctx.mergePartitionedState(stateDesc);
+               Long nextFireTimestamp = 
ctx.getPartitionedState(stateDesc).get();
+               if (nextFireTimestamp != null) {
+                       ctx.registerEventTimeTimer(nextFireTimestamp);
+               }
                return TriggerResult.CONTINUE;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/05a5f460/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 62266c4..dfa353c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -595,6 +596,81 @@ public class WindowOperatorTest {
                testHarness.close();
        }
 
+       /**
+        * This tests whether merging works correctly with the 
ContinuousEventTimeTrigger.
+        * @throws Exception
+        */
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testSessionWindowsWithContinuousEventTimeTrigger() throws 
Exception {
+               closeCalled.set(0);
+
+               final int SESSION_SIZE = 3;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new 
ListStateDescriptor<>("window-contents",
+                       inputType.createSerializer(new ExecutionConfig()));
+
+               WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> 
operator = new WindowOperator<>(
+                       
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+                       new TimeWindow.Serializer(),
+                       new TupleKeySelector(),
+                       BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                       stateDesc,
+                       new InternalIterableWindowFunction<>(new 
SessionWindowFunction()),
+                       ContinuousEventTimeTrigger.of(Time.seconds(2)),
+                       0);
+
+               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
+                       new OneInputStreamOperatorTestHarness<>(operator);
+
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               testHarness.open();
+
+               // add elements out-of-order and first trigger time is 2000
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 1500));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 0));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 3), 2500));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 2), 1000));
+
+               // triggers emit and next trigger time is 4000
+               testHarness.processWatermark(new Watermark(2500));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-1", 
1500L, 4500L), 4499));
+               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 
0L, 5500L), 5499));
+               expectedOutput.add(new Watermark(2500));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 5), 4000));
+               testHarness.processWatermark(new Watermark(3000));
+               expectedOutput.add(new Watermark(3000));
+
+               // do a snapshot, close and restore again
+               StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+               testHarness.close();
+               testHarness.setup();
+               testHarness.restore(snapshot, 0);
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 2), 4000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 4), 3500));
+               // triggers emit and next trigger time is 6000
+               testHarness.processWatermark(new Watermark(4000));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 
1500L, 7000L), 6999));
+               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-15", 
0L, 7000L), 6999));
+               expectedOutput.add(new Watermark(4000));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
+
+               testHarness.close();
+       }
+
        @Test
        public void testMergeAndEvictor() throws Exception {
                // verify that merging WindowAssigner and Evictor cannot be 
used together

Reply via email to