[hotfix] [e2e-tests] Make SequenceGeneratorSource usable for 0-size key ranges
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c34ff1f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c34ff1f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c34ff1f Branch: refs/heads/master Commit: 5c34ff1f2b6f8b7c35885646be5b60701cc10348 Parents: f41b00e Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Mon Apr 30 18:04:43 2018 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue May 22 15:50:54 2018 +0800 ---------------------------------------------------------------------- .../tests/SequenceGeneratorSource.java | 28 ++++++++++++++++++++ 1 file changed, 28 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5c34ff1f/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java index e641551..40c0db5 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java @@ -91,7 +91,14 @@ public class SequenceGeneratorSource extends RichParallelSourceFunction<Event> i @Override public void run(SourceContext<Event> ctx) throws Exception { + if (keyRanges.size() > 0) { + runActive(ctx); + } else { + runIdle(ctx); + } + } + private void runActive(SourceContext<Event> ctx) throws Exception { Random random = new Random(); // this holds the current event time, from which generated events can up to +/- (maxOutOfOrder). @@ -133,6 +140,27 @@ public class SequenceGeneratorSource extends RichParallelSourceFunction<Event> i } } + private void runIdle(SourceContext<Event> ctx) throws Exception { + ctx.markAsTemporarilyIdle(); + + // just wait until this source is canceled + final Object waitLock = new Object(); + while (running) { + try { + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (waitLock) { + waitLock.wait(); + } + } + catch (InterruptedException e) { + if (!running) { + // restore the interrupted state, and fall through the loop + Thread.currentThread().interrupt(); + } + } + } + } + private long generateEventTimeWithOutOfOrderness(Random random, long correctTime) { return correctTime - maxOutOfOrder + ((random.nextLong() & Long.MAX_VALUE) % (2 * maxOutOfOrder)); }
