http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 88a5703..824df2d 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -32,9 +32,9 @@ import 
org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.net.URL;
@@ -57,7 +57,6 @@ public class CEPMigration11to13Test {
        }
 
        @Test
-       @Ignore
        public void testKeyedCEPOperatorMigratation() throws Exception {
 
                KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
@@ -136,11 +135,58 @@ public class CEPMigration11to13Test {
                assertEquals(middleEvent, patternMap.get("middle").get(0));
                assertEquals(endEvent, patternMap.get("end").get(0));
 
+               // and now go for a checkpoint with the new serializers
+
+               final Event startEvent1 = new Event(42, "start", 2.0);
+               final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 
11.0);
+               final Event endEvent1 = new Event(42, "end", 2.0);
+
+               harness.processElement(new StreamRecord<Event>(startEvent1, 
21));
+               harness.processElement(new StreamRecord<Event>(middleEvent1, 
23));
+
+               // simulate snapshot/restore with some elements in internal 
sorting queue
+               OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+               harness.close();
+
+               harness = new KeyedOneInputStreamOperatorTestHarness<>(
+                                               new KeyedCEPPatternOperator<>(
+                                                               
Event.createTypeSerializer(),
+                                                               false,
+                                                               
IntSerializer.INSTANCE,
+                                                               new 
NFAFactory(),
+                                                               true),
+                                               keySelector,
+                                               BasicTypeInfo.INT_TYPE_INFO);
+
+               harness.setup();
+               harness.initializeState(snapshot);
+               harness.open();
+
+               harness.processElement(new StreamRecord<>(endEvent1, 25));
+
+               harness.processWatermark(new Watermark(50));
+
+               result = harness.getOutput();
+
+               // watermark and the result
+               assertEquals(2, result.size());
+
+               Object resultObject1 = result.poll();
+               assertTrue(resultObject1 instanceof StreamRecord);
+               StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+               assertTrue(resultRecord1.getValue() instanceof Map);
+
+               @SuppressWarnings("unchecked")
+               Map<String, List<Event>> patternMap1 = (Map<String, 
List<Event>>) resultRecord1.getValue();
+
+               assertEquals(startEvent1, patternMap1.get("start").get(0));
+               assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+               assertEquals(endEvent1, patternMap1.get("end").get(0));
+
                harness.close();
        }
 
        @Test
-       @Ignore
        public void testNonKeyedCEPFunctionMigration() throws Exception {
 
                final Event startEvent = new Event(42, "start", 1.0);
@@ -191,7 +237,7 @@ public class CEPMigration11to13Test {
                harness.processElement(new StreamRecord<>(new Event(42, 
"start", 1.0), 4));
                harness.processElement(new StreamRecord<>(endEvent, 5));
 
-               harness.processWatermark(new Watermark(Long.MAX_VALUE));
+               harness.processWatermark(new Watermark(20));
 
                ConcurrentLinkedQueue<Object> result = harness.getOutput();
 
@@ -210,6 +256,54 @@ public class CEPMigration11to13Test {
                assertEquals(middleEvent, patternMap.get("middle").get(0));
                assertEquals(endEvent, patternMap.get("end").get(0));
 
+               // and now go for a checkpoint with the new serializers
+
+               final Event startEvent1 = new Event(42, "start", 2.0);
+               final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 
11.0);
+               final Event endEvent1 = new Event(42, "end", 2.0);
+
+               harness.processElement(new StreamRecord<Event>(startEvent1, 
21));
+               harness.processElement(new StreamRecord<Event>(middleEvent1, 
23));
+
+               // simulate snapshot/restore with some elements in internal 
sorting queue
+               OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+               harness.close();
+
+               harness = new KeyedOneInputStreamOperatorTestHarness<>(
+                                               new KeyedCEPPatternOperator<>(
+                                                               
Event.createTypeSerializer(),
+                                                               false,
+                                                               
ByteSerializer.INSTANCE,
+                                                               new 
NFAFactory(),
+                                                               false),
+                                               keySelector,
+                                               BasicTypeInfo.BYTE_TYPE_INFO);
+
+               harness.setup();
+               harness.initializeState(snapshot);
+               harness.open();
+
+               harness.processElement(new StreamRecord<>(endEvent1, 25));
+
+               harness.processWatermark(new Watermark(50));
+
+               result = harness.getOutput();
+
+               // watermark and the result
+               assertEquals(2, result.size());
+
+               Object resultObject1 = result.poll();
+               assertTrue(resultObject1 instanceof StreamRecord);
+               StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+               assertTrue(resultRecord1.getValue() instanceof Map);
+
+               @SuppressWarnings("unchecked")
+               Map<String, List<Event>> patternMap1 = (Map<String, 
List<Event>>) resultRecord1.getValue();
+
+               assertEquals(startEvent1, patternMap1.get("start").get(0));
+               assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+               assertEquals(endEvent1, patternMap1.get("end").get(0));
+
                harness.close();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index eb50dfd..41593b0 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -379,7 +380,6 @@ public class CEPOperatorTest extends TestLogger {
                Event middle1Event3 = new Event(41, "a", 4.0);
                Event middle2Event1 = new Event(41, "b", 5.0);
 
-               TestKeySelector keySelector = new TestKeySelector();
                KeyedCEPPatternOperator<Event, Integer> operator = new 
KeyedCEPPatternOperator<>(
                                Event.createTypeSerializer(),
                                false,
@@ -530,7 +530,113 @@ public class CEPOperatorTest extends TestLogger {
 
                harness.close();
        }
-       
+
+       @Test
+       public void testCEPOperatorSerializationWRocksDB() throws Exception {
+               String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
+               RocksDBStateBackend rocksDBStateBackend = new 
RocksDBStateBackend(new MemoryStateBackend());
+               rocksDBStateBackend.setDbStoragePath(rocksDbPath);
+
+               final Event startEvent1 = new Event(40, "start", 1.0);
+               final Event startEvent2 = new Event(40, "start", 2.0);
+               final SubEvent middleEvent1 = new SubEvent(40, "foo1", 1.0, 10);
+               final SubEvent middleEvent2 = new SubEvent(40, "foo2", 2.0, 10);
+               final SubEvent middleEvent3 = new SubEvent(40, "foo3", 3.0, 10);
+               final SubEvent middleEvent4 = new SubEvent(40, "foo4", 1.0, 10);
+               final Event nextOne = new Event(40, "next-one", 1.0);
+               final Event endEvent = new Event(40, "end", 1.0);
+
+               final Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("start");
+                       }
+               }).followedBy("middle").subtype(SubEvent.class).where(new 
IterativeCondition<SubEvent>() {
+
+                       private static final long serialVersionUID = 
6215754202506583964L;
+
+                       @Override
+                       public boolean filter (SubEvent value, Context < 
SubEvent > ctx) throws Exception {
+                               if (!value.getName().startsWith("foo")) {
+                                       return false;
+                               }
+
+                               double sum = 0.0;
+                               for (Event event : 
ctx.getEventsForPattern("middle")) {
+                                       sum += event.getPrice();
+                               }
+                               sum += value.getPrice();
+                               return Double.compare(sum, 5.0) < 0;
+                       }
+               }).oneOrMore().allowCombinations().followedBy("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
7056763917392056548L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("end");
+                       }
+               });
+
+               KeyedCEPPatternOperator<Event, Integer> operator = new 
KeyedCEPPatternOperator<>(
+                               Event.createTypeSerializer(),
+                               false,
+                               IntSerializer.INSTANCE,
+                               new NFACompiler.NFAFactory<Event>() {
+
+                                       private static final long 
serialVersionUID = 477082663248051994L;
+
+                                       @Override
+                                       public NFA<Event> createNFA() {
+                                               return 
NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+                                       }
+                               },
+                               true);
+
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(operator);
+               harness.setStateBackend(rocksDBStateBackend);
+               harness.open();
+
+               harness.processWatermark(0L);
+               harness.processElement(new StreamRecord<>(startEvent1, 1));
+               harness.processElement(new StreamRecord<Event>(middleEvent1, 
2));
+               harness.processWatermark(2L);
+               harness.processElement(new StreamRecord<Event>(middleEvent2, 
3));
+               harness.processElement(new StreamRecord<>(startEvent2, 4));
+               harness.processElement(new StreamRecord<Event>(middleEvent3, 
5));
+               harness.processWatermark(5L);
+               harness.processElement(new StreamRecord<Event>(middleEvent4, 
5));
+               harness.processElement(new StreamRecord<>(nextOne, 6));
+               harness.processElement(new StreamRecord<>(endEvent, 8));
+               harness.processWatermark(100L);
+
+               List<List<Event>> resultingPatterns = new ArrayList<>();
+               while (!harness.getOutput().isEmpty()) {
+                       Object o = harness.getOutput().poll();
+                       if (!(o instanceof Watermark)) {
+                               StreamRecord<Map<String, List<Event>>> el = 
(StreamRecord<Map<String, List<Event>>>) o;
+                               List<Event> res = new ArrayList<>();
+                               for (List<Event> le: el.getValue().values()) {
+                                       res.addAll(le);
+                               }
+                               resultingPatterns.add(res);
+                       }
+               }
+
+               compareMaps(resultingPatterns,
+                               Lists.<List<Event>>newArrayList(
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent1, middleEvent2, middleEvent4),
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent2, middleEvent1),
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent3, middleEvent1),
+                                               Lists.newArrayList(startEvent2, 
endEvent, middleEvent3, middleEvent4),
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent4, middleEvent1),
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent1),
+                                               Lists.newArrayList(startEvent2, 
endEvent, middleEvent3)
+                               )
+               );
+       }
+
        private void verifyWatermark(Object outputObject, long timestamp) {
                assertTrue(outputObject instanceof Watermark);
                assertEquals(timestamp, ((Watermark) 
outputObject).getTimestamp());

Reply via email to