Repository: bahir-flink Updated Branches: refs/heads/master 50f3f125c -> 0eceb1da2
[BAHIR-156] Improved integration test cases This closes #24 Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/0eceb1da Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/0eceb1da Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/0eceb1da Branch: refs/heads/master Commit: 0eceb1da2d6799065b5c4e14aefa23209279fef8 Parents: 50f3f12 Author: Aleksandr Chermenin <aleksandr_cherme...@epam.com> Authored: Fri Dec 1 17:09:34 2017 +0300 Committer: Robert Metzger <rmetz...@apache.org> Committed: Mon Dec 25 18:26:36 2017 +0100 ---------------------------------------------------------------------- .../org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java | 2 ++ .../flink/streaming/siddhi/source/RandomEventSource.java | 9 +++++---- .../flink/streaming/siddhi/source/RandomTupleSource.java | 9 +++++---- .../flink/streaming/siddhi/source/RandomWordSource.java | 9 +++++---- 4 files changed, 17 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/0eceb1da/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java index 5c16c71..821c594 100755 --- a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java @@ -277,6 +277,8 @@ public class SiddhiCEPITCase extends StreamingMultipleProgramsTestBase implement @Test public void testUnboundedPojoStreamSimplePatternMatch() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + DataStream<Event> input1 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input1"); DataStream<Event> input2 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input2"); http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/0eceb1da/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java index bb95fdd..0742054 100644 --- a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.siddhi.source; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; public class RandomEventSource implements SourceFunction<Event> { private final int count; @@ -27,7 +28,7 @@ public class RandomEventSource implements SourceFunction<Event> { private final long initialTimestamp; private volatile boolean isRunning = true; - private volatile int number = 0; + private volatile AtomicInteger number = new AtomicInteger(0); private volatile long closeDelayTimestamp = 1000; public RandomEventSource(int count, long initialTimestamp) { @@ -52,9 +53,9 @@ public class RandomEventSource implements SourceFunction<Event> { @Override public void run(SourceContext<Event> ctx) throws Exception { while (isRunning) { - ctx.collect(Event.of(number, "test_event", random.nextDouble(), initialTimestamp + 1000 * number)); - number++; - if (number >= this.count) { + long timestamp = initialTimestamp + 1000 * number.get(); + ctx.collectWithTimestamp(Event.of(number.get(), "test_event", random.nextDouble(), timestamp), timestamp); + if (number.incrementAndGet() >= this.count) { cancel(); } } http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/0eceb1da/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java index 35121f7..f3b571a 100644 --- a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java @@ -21,6 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; public class RandomTupleSource implements SourceFunction<Tuple4<Integer, String, Double, Long>> { private final int count; @@ -28,7 +29,7 @@ public class RandomTupleSource implements SourceFunction<Tuple4<Integer, String, private final long initialTimestamp; private volatile boolean isRunning = true; - private volatile int number = 0; + private volatile AtomicInteger number = new AtomicInteger(0); private long closeDelayTimestamp; public RandomTupleSource(int count, long initialTimestamp) { @@ -54,9 +55,9 @@ public class RandomTupleSource implements SourceFunction<Tuple4<Integer, String, @Override public void run(SourceContext<Tuple4<Integer, String, Double, Long>> ctx) throws Exception { while (isRunning) { - ctx.collect(Tuple4.of(number, "test_tuple", random.nextDouble(), initialTimestamp + 1000 * number)); - number++; - if (number >= this.count) { + long timestamp = initialTimestamp + 1000 * number.get(); + ctx.collectWithTimestamp(Tuple4.of(number.get(), "test_tuple", random.nextDouble(), timestamp), timestamp); + if (number.incrementAndGet() >= this.count) { cancel(); } } http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/0eceb1da/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java index 19d904f..1c17240 100644 --- a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.siddhi.source; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; public class RandomWordSource implements SourceFunction<String> { private static final String[] WORDS = new String[] { @@ -65,7 +66,7 @@ public class RandomWordSource implements SourceFunction<String> { private final long initialTimestamp; private volatile boolean isRunning = true; - private volatile int number = 0; + private volatile AtomicInteger number = new AtomicInteger(0); private long closeDelayTimestamp; public RandomWordSource(int count, long initialTimestamp) { @@ -91,9 +92,9 @@ public class RandomWordSource implements SourceFunction<String> { @Override public void run(SourceContext<String> ctx) throws Exception { while (isRunning) { - ctx.collectWithTimestamp(WORDS[random.nextInt(WORDS.length)], initialTimestamp + 1000 * number); - number++; - if (number >= this.count) { + long timestamp = initialTimestamp + 1000 * number.get(); + ctx.collectWithTimestamp(WORDS[random.nextInt(WORDS.length)], timestamp); + if (number.incrementAndGet() >= this.count) { cancel(); } }