[hotfix] [e2e-tests] Make SequenceGeneratorSource usable for 0-size key ranges


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c34ff1f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c34ff1f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c34ff1f

Branch: refs/heads/master
Commit: 5c34ff1f2b6f8b7c35885646be5b60701cc10348
Parents: f41b00e
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Mon Apr 30 18:04:43 2018 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue May 22 15:50:54 2018 +0800

----------------------------------------------------------------------
 .../tests/SequenceGeneratorSource.java          | 28 ++++++++++++++++++++
 1 file changed, 28 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c34ff1f/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
index e641551..40c0db5 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
@@ -91,7 +91,14 @@ public class SequenceGeneratorSource extends 
RichParallelSourceFunction<Event> i
 
        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
+               if (keyRanges.size() > 0) {
+                       runActive(ctx);
+               } else {
+                       runIdle(ctx);
+               }
+       }
 
+       private void runActive(SourceContext<Event> ctx) throws Exception {
                Random random = new Random();
 
                // this holds the current event time, from which generated 
events can up to +/- (maxOutOfOrder).
@@ -133,6 +140,27 @@ public class SequenceGeneratorSource extends 
RichParallelSourceFunction<Event> i
                }
        }
 
+       private void runIdle(SourceContext<Event> ctx) throws Exception {
+               ctx.markAsTemporarilyIdle();
+
+               // just wait until this source is canceled
+               final Object waitLock = new Object();
+               while (running) {
+                       try {
+                               //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                               synchronized (waitLock) {
+                                       waitLock.wait();
+                               }
+                       }
+                       catch (InterruptedException e) {
+                               if (!running) {
+                                       // restore the interrupted state, and 
fall through the loop
+                                       Thread.currentThread().interrupt();
+                               }
+                       }
+               }
+       }
+
        private long generateEventTimeWithOutOfOrderness(Random random, long 
correctTime) {
                return correctTime - maxOutOfOrder + ((random.nextLong() & 
Long.MAX_VALUE) % (2 * maxOutOfOrder));
        }

Reply via email to