Repository: bahir-flink
Updated Branches:
  refs/heads/master 50f3f125c -> 0eceb1da2


[BAHIR-156] Improved integration test cases

This closes #24


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/0eceb1da
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/0eceb1da
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/0eceb1da

Branch: refs/heads/master
Commit: 0eceb1da2d6799065b5c4e14aefa23209279fef8
Parents: 50f3f12
Author: Aleksandr Chermenin <aleksandr_cherme...@epam.com>
Authored: Fri Dec 1 17:09:34 2017 +0300
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Mon Dec 25 18:26:36 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java  | 2 ++
 .../flink/streaming/siddhi/source/RandomEventSource.java    | 9 +++++----
 .../flink/streaming/siddhi/source/RandomTupleSource.java    | 9 +++++----
 .../flink/streaming/siddhi/source/RandomWordSource.java     | 9 +++++----
 4 files changed, 17 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/0eceb1da/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
index 5c16c71..821c594 100755
--- 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
+++ 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
@@ -277,6 +277,8 @@ public class SiddhiCEPITCase extends 
StreamingMultipleProgramsTestBase implement
     @Test
     public void testUnboundedPojoStreamSimplePatternMatch() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
         DataStream<Event> input1 = env.addSource(new 
RandomEventSource(5).closeDelay(1500), "input1");
         DataStream<Event> input2 = env.addSource(new 
RandomEventSource(5).closeDelay(1500), "input2");
 

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/0eceb1da/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java
----------------------------------------------------------------------
diff --git 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java
 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java
index bb95fdd..0742054 100644
--- 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java
+++ 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.siddhi.source;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class RandomEventSource implements SourceFunction<Event> {
     private final int count;
@@ -27,7 +28,7 @@ public class RandomEventSource implements 
SourceFunction<Event> {
     private final long initialTimestamp;
 
     private volatile boolean isRunning = true;
-    private volatile int number = 0;
+    private volatile AtomicInteger number = new AtomicInteger(0);
     private volatile long closeDelayTimestamp = 1000;
 
     public RandomEventSource(int count, long initialTimestamp) {
@@ -52,9 +53,9 @@ public class RandomEventSource implements 
SourceFunction<Event> {
     @Override
     public void run(SourceContext<Event> ctx) throws Exception {
         while (isRunning) {
-            ctx.collect(Event.of(number, "test_event", random.nextDouble(), 
initialTimestamp + 1000 * number));
-            number++;
-            if (number >= this.count) {
+            long timestamp = initialTimestamp + 1000 * number.get();
+            ctx.collectWithTimestamp(Event.of(number.get(), "test_event", 
random.nextDouble(), timestamp), timestamp);
+            if (number.incrementAndGet() >= this.count) {
                 cancel();
             }
         }

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/0eceb1da/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java
----------------------------------------------------------------------
diff --git 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java
 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java
index 35121f7..f3b571a 100644
--- 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java
+++ 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class RandomTupleSource implements SourceFunction<Tuple4<Integer, 
String, Double, Long>> {
     private final int count;
@@ -28,7 +29,7 @@ public class RandomTupleSource implements 
SourceFunction<Tuple4<Integer, String,
     private final long initialTimestamp;
 
     private volatile boolean isRunning = true;
-    private volatile int number = 0;
+    private volatile AtomicInteger number = new AtomicInteger(0);
     private long closeDelayTimestamp;
 
     public RandomTupleSource(int count, long initialTimestamp) {
@@ -54,9 +55,9 @@ public class RandomTupleSource implements 
SourceFunction<Tuple4<Integer, String,
     @Override
     public void run(SourceContext<Tuple4<Integer, String, Double, Long>> ctx) 
throws Exception {
         while (isRunning) {
-            ctx.collect(Tuple4.of(number, "test_tuple", random.nextDouble(), 
initialTimestamp + 1000 * number));
-            number++;
-            if (number >= this.count) {
+            long timestamp = initialTimestamp + 1000 * number.get();
+            ctx.collectWithTimestamp(Tuple4.of(number.get(), "test_tuple", 
random.nextDouble(), timestamp), timestamp);
+            if (number.incrementAndGet() >= this.count) {
                 cancel();
             }
         }

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/0eceb1da/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java
----------------------------------------------------------------------
diff --git 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java
 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java
index 19d904f..1c17240 100644
--- 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java
+++ 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.siddhi.source;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class RandomWordSource implements SourceFunction<String> {
     private static final String[] WORDS = new String[] {
@@ -65,7 +66,7 @@ public class RandomWordSource implements 
SourceFunction<String> {
     private final long initialTimestamp;
 
     private volatile boolean isRunning = true;
-    private volatile int number = 0;
+    private volatile AtomicInteger number = new AtomicInteger(0);
     private long closeDelayTimestamp;
 
     public RandomWordSource(int count, long initialTimestamp) {
@@ -91,9 +92,9 @@ public class RandomWordSource implements 
SourceFunction<String> {
     @Override
     public void run(SourceContext<String> ctx) throws Exception {
         while (isRunning) {
-            ctx.collectWithTimestamp(WORDS[random.nextInt(WORDS.length)], 
initialTimestamp + 1000 * number);
-            number++;
-            if (number >= this.count) {
+            long timestamp = initialTimestamp + 1000 * number.get();
+            ctx.collectWithTimestamp(WORDS[random.nextInt(WORDS.length)], 
timestamp);
+            if (number.incrementAndGet() >= this.count) {
                 cancel();
             }
         }

Reply via email to