Repository: flink
Updated Branches:
  refs/heads/master d0695c054 -> d20fb090c


[FLINK-3318] Backward compatibility of CEP NFA


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d20fb090
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d20fb090
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d20fb090

Branch: refs/heads/master
Commit: d20fb090c31858bc0372a8c84228d796558d56b0
Parents: 9001c4e
Author: Dawid Wysakowicz <[email protected]>
Authored: Tue Mar 21 10:53:07 2017 +0100
Committer: kl0u <[email protected]>
Committed: Thu Mar 23 10:47:55 2017 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 162 ++++++-
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |  49 ++
 .../java/org/apache/flink/cep/nfa/State.java    |  19 +-
 .../flink/cep/nfa/compiler/NFACompiler.java     | 116 +++++
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 194 +++++++-
 .../cep/operator/CEPMigration12to13Test.java    | 477 +++++++++++++++++++
 .../test/resources/cep-branching-snapshot-1.2   | Bin 0 -> 6736 bytes
 .../resources/cep-single-pattern-snapshot-1.2   | Bin 0 -> 3311 bytes
 .../test/resources/cep-starting-snapshot-1.2    | Bin 0 -> 6526 bytes
 9 files changed, 986 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 3d42248..ab03566 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.nfa;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.LinkedHashMultimap;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -25,18 +27,22 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
+import javax.annotation.Nullable;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.OptionalDataException;
 import java.io.Serializable;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -85,9 +91,9 @@ public class NFA<T> implements Serializable {
        private final NonDuplicatingTypeSerializer<T> 
nonDuplicatingTypeSerializer;
 
        /**
-        *      Buffer used to store the matched events.
+        *      Used only for backward compatibility. Buffer used to store the 
matched events.
         */
-       private final SharedBuffer<String, T> sharedBuffer;
+       private final SharedBuffer<State<T>, T> sharedBuffer = null;
 
        /**
         * A set of all the valid NFA states, as returned by the
@@ -110,12 +116,22 @@ public class NFA<T> implements Serializable {
        private final boolean handleTimeout;
 
        /**
+        * Used only for backward compatibility.
+        */
+       private int startEventCounter;
+
+       /**
         * Current set of {@link ComputationState computation states} within 
the state machine.
         * These are the "active" intermediate states that are waiting for new 
matching
         * events to transition to new valid states.
         */
        private transient Queue<ComputationState<T>> computationStates;
 
+       /**
+        *      Buffer used to store the matched events.
+        */
+       private final SharedBuffer<String, T> stringSharedBuffer;
+
        public NFA(
                        final TypeSerializer<T> eventSerializer,
                        final long windowTime,
@@ -124,7 +140,7 @@ public class NFA<T> implements Serializable {
                this.nonDuplicatingTypeSerializer = new 
NonDuplicatingTypeSerializer<>(eventSerializer);
                this.windowTime = windowTime;
                this.handleTimeout = handleTimeout;
-               sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
+               stringSharedBuffer = new 
SharedBuffer<>(nonDuplicatingTypeSerializer);
                computationStates = new LinkedList<>();
 
                states = new HashSet<>();
@@ -156,7 +172,7 @@ public class NFA<T> implements Serializable {
         * {@code false} otherwise.
         */
        public boolean isEmpty() {
-               return sharedBuffer.isEmpty();
+               return stringSharedBuffer.isEmpty();
        }
 
        /**
@@ -194,9 +210,14 @@ public class NFA<T> implements Serializable {
                                        }
                                }
 
-                               // remove computation state which has exceeded 
the window length
-                               
sharedBuffer.release(computationState.getState().getName(), 
computationState.getEvent(), computationState.getTimestamp());
-                               
sharedBuffer.remove(computationState.getState().getName(), 
computationState.getEvent(), computationState.getTimestamp());
+                               stringSharedBuffer.release(
+                                               
computationState.getPreviousState().getName(),
+                                               computationState.getEvent(),
+                                               
computationState.getTimestamp());
+                               stringSharedBuffer.remove(
+                                               
computationState.getPreviousState().getName(),
+                                               computationState.getEvent(),
+                                               
computationState.getTimestamp());
 
                                newComputationStates = Collections.emptyList();
                        } else if (event != null) {
@@ -212,8 +233,8 @@ public class NFA<T> implements Serializable {
                                        result.addAll(matches);
 
                                        // remove found patterns because they 
are no longer needed
-                                       
sharedBuffer.release(newComputationState.getPreviousState().getName(), 
newComputationState.getEvent(), newComputationState.getTimestamp());
-                                       
sharedBuffer.remove(newComputationState.getPreviousState().getName(), 
newComputationState.getEvent(), newComputationState.getTimestamp());
+                                       
stringSharedBuffer.release(newComputationState.getPreviousState().getName(), 
newComputationState.getEvent(), newComputationState.getTimestamp());
+                                       
stringSharedBuffer.remove(newComputationState.getPreviousState().getName(), 
newComputationState.getEvent(), newComputationState.getTimestamp());
                                } else {
                                        // add new computation state; it will 
be processed once the next event arrives
                                        
computationStates.add(newComputationState);
@@ -230,7 +251,7 @@ public class NFA<T> implements Serializable {
 
                                // remove all elements which are expired
                                // with respect to the window length
-                               sharedBuffer.prune(pruningTimestamp);
+                               stringSharedBuffer.prune(pruningTimestamp);
                        }
                }
 
@@ -244,7 +265,7 @@ public class NFA<T> implements Serializable {
                        NFA<T> other = (NFA<T>) obj;
 
                        return 
nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
-                               sharedBuffer.equals(other.sharedBuffer) &&
+                               
stringSharedBuffer.equals(other.stringSharedBuffer) &&
                                states.equals(other.states) &&
                                windowTime == other.windowTime;
                } else {
@@ -254,7 +275,7 @@ public class NFA<T> implements Serializable {
 
        @Override
        public int hashCode() {
-               return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, 
states, windowTime);
+               return Objects.hash(nonDuplicatingTypeSerializer, 
stringSharedBuffer, states, windowTime);
        }
 
        private static <T> boolean isEquivalentState(final State<T> s1, final 
State<T> s2) {
@@ -376,7 +397,7 @@ public class NFA<T> implements Serializable {
                                                                
computationState.getStartTimestamp()
                                                        )
                                                );
-                                               sharedBuffer.lock(
+                                               stringSharedBuffer.lock(
                                                        
computationState.getPreviousState().getName(),
                                                        
computationState.getEvent(),
                                                        
computationState.getTimestamp());
@@ -397,14 +418,14 @@ public class NFA<T> implements Serializable {
                                        final long startTimestamp;
                                        if (computationState.isStartState()) {
                                                startTimestamp = timestamp;
-                                               sharedBuffer.put(
+                                               stringSharedBuffer.put(
                                                        
consumingState.getName(),
                                                        event,
                                                        timestamp,
                                                        currentVersion);
                                        } else {
                                                startTimestamp = 
computationState.getStartTimestamp();
-                                               sharedBuffer.put(
+                                               stringSharedBuffer.put(
                                                        
consumingState.getName(),
                                                        event,
                                                        timestamp,
@@ -415,7 +436,7 @@ public class NFA<T> implements Serializable {
                                        }
 
                                        // a new computation state is referring 
to the shared entry
-                                       
sharedBuffer.lock(consumingState.getName(), event, timestamp);
+                                       
stringSharedBuffer.lock(consumingState.getName(), event, timestamp);
 
                                        
resultingComputationStates.add(ComputationState.createState(
                                                newState,
@@ -429,7 +450,7 @@ public class NFA<T> implements Serializable {
                                        //check if newly created state is 
optional (have a PROCEED path to Final state)
                                        final State<T> finalState = 
findFinalStateAfterProceed(newState, event);
                                        if (finalState != null) {
-                                               
sharedBuffer.lock(consumingState.getName(), event, timestamp);
+                                               
stringSharedBuffer.lock(consumingState.getName(), event, timestamp);
                                                
resultingComputationStates.add(ComputationState.createState(
                                                        finalState,
                                                        consumingState,
@@ -450,12 +471,12 @@ public class NFA<T> implements Serializable {
 
                if (computationState.getEvent() != null) {
                        // release the shared entry referenced by the current 
computation state.
-                       sharedBuffer.release(
+                       stringSharedBuffer.release(
                                computationState.getPreviousState().getName(),
                                computationState.getEvent(),
                                computationState.getTimestamp());
                        // try to remove unnecessary shared buffer entries
-                       sharedBuffer.remove(
+                       stringSharedBuffer.remove(
                                computationState.getPreviousState().getName(),
                                computationState.getEvent(),
                                computationState.getTimestamp());
@@ -546,7 +567,7 @@ public class NFA<T> implements Serializable {
         * @return Collection of event sequences which end in the given 
computation state
         */
        private Collection<Map<String, T>> extractPatternMatches(final 
ComputationState<T> computationState) {
-               Collection<LinkedHashMultimap<String, T>> paths = 
sharedBuffer.extractPatterns(
+               Collection<LinkedHashMultimap<String, T>> paths = 
stringSharedBuffer.extractPatterns(
                        computationState.getPreviousState().getName(),
                        computationState.getEvent(),
                        computationState.getTimestamp(),
@@ -592,6 +613,8 @@ public class NFA<T> implements Serializable {
                nonDuplicatingTypeSerializer.clearReferences();
        }
 
+       private final static String BEGINNING_STATE_NAME = "$beginningState$";
+
        private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
                ois.defaultReadObject();
 
@@ -599,15 +622,103 @@ public class NFA<T> implements Serializable {
 
                computationStates = new LinkedList<>();
 
+               final List<ComputationState<T>> readComputationStates = new 
ArrayList<>(numberComputationStates);
+
+               boolean afterMigration = false;
                for (int i = 0; i < numberComputationStates; i++) {
                        ComputationState<T> computationState = 
readComputationState(ois);
+                       if 
(computationState.getState().getName().equals(BEGINNING_STATE_NAME)) {
+                               afterMigration = true;
+                       }
 
-                       computationStates.offer(computationState);
+                       readComputationStates.add(computationState);
+               }
+
+               if (afterMigration && !readComputationStates.isEmpty()) {
+                       try {
+                               //Backwards compatibility
+                               
this.computationStates.addAll(migrateNFA(readComputationStates));
+                               final Field newSharedBufferField = 
NFA.class.getDeclaredField("stringSharedBuffer");
+                               final Field sharedBufferField = 
NFA.class.getDeclaredField("sharedBuffer");
+                               sharedBufferField.setAccessible(true);
+                               newSharedBufferField.setAccessible(true);
+                               newSharedBufferField.set(this, 
SharedBuffer.migrateSharedBuffer(this.sharedBuffer));
+                               sharedBufferField.set(this, null);
+                               sharedBufferField.setAccessible(false);
+                               newSharedBufferField.setAccessible(false);
+                       } catch (Exception e) {
+                               throw new IllegalStateException("Could not 
migrate from earlier version", e);
+                       }
+               } else {
+                       this.computationStates.addAll(readComputationStates);
                }
 
                nonDuplicatingTypeSerializer.clearReferences();
        }
 
+       /**
+        * Needed for backward compatibility. First migrates the {@link State} 
graph see {@link NFACompiler#migrateGraph(State)}.
+        * Than recreates the {@link ComputationState}s with the new {@link 
State} graph.
+        * @param readStates computation states read from snapshot
+        * @return collection of migrated computation states
+        */
+       private Collection<ComputationState<T>> 
migrateNFA(Collection<ComputationState<T>> readStates) {
+               final ArrayList<ComputationState<T>> computationStates = new 
ArrayList<>();
+
+               final State<T> startState = Iterators.find(
+                       readStates.iterator(),
+                       new Predicate<ComputationState<T>>() {
+                               @Override
+                               public boolean apply(@Nullable 
ComputationState<T> input) {
+                                       return input != null && 
input.getState().getName().equals(BEGINNING_STATE_NAME);
+                               }
+                       }).getState();
+
+               final Map<String, State<T>> convertedStates = 
NFACompiler.migrateGraph(startState);
+
+               for (ComputationState<T> readState : readStates) {
+                       if (!readState.isStartState()) {
+                               final String previousName = 
readState.getState().getName();
+                               final String currentName = Iterators.find(
+                                       
readState.getState().getStateTransitions().iterator(),
+                                       new Predicate<StateTransition<T>>() {
+                                               @Override
+                                               public boolean apply(@Nullable 
StateTransition<T> input) {
+                                                       return input != null && 
input.getAction() == StateTransitionAction.TAKE;
+                                               }
+                                       }).getTargetState().getName();
+
+
+                               final State<T> previousState = 
convertedStates.get(previousName);
+
+                               
computationStates.add(ComputationState.createState(
+                                       convertedStates.get(currentName),
+                                       previousState,
+                                       readState.getEvent(),
+                                       readState.getTimestamp(),
+                                       readState.getVersion(),
+                                       readState.getStartTimestamp()
+                               ));
+                       }
+               }
+
+               final String startName = 
Iterators.find(convertedStates.values().iterator(), new Predicate<State<T>>() {
+                       @Override
+                       public boolean apply(@Nullable State<T> input) {
+                               return input != null && input.isStart();
+                       }
+               }).getName();
+
+               computationStates.add(ComputationState.createStartState(
+                       convertedStates.get(startName),
+                       new DeweyNumber(this.startEventCounter)));
+
+               this.states.clear();
+               this.states.addAll(convertedStates.values());
+
+               return computationStates;
+       }
+
        private void writeComputationState(final ComputationState<T> 
computationState, final ObjectOutputStream oos) throws IOException {
                oos.writeObject(computationState.getState());
                oos.writeObject(computationState.getPreviousState());
@@ -629,7 +740,13 @@ public class NFA<T> implements Serializable {
        @SuppressWarnings("unchecked")
        private ComputationState<T> readComputationState(ObjectInputStream ois) 
throws IOException, ClassNotFoundException {
                final State<T> state = (State<T>)ois.readObject();
-               final State<T> previousState = (State<T>)ois.readObject();
+               State<T> previousState;
+               try {
+                       previousState = (State<T>)ois.readObject();
+               } catch (OptionalDataException e) {
+                       previousState = null;
+               }
+
                final long timestamp = ois.readLong();
                final DeweyNumber version = (DeweyNumber)ois.readObject();
                final long startTimestamp = ois.readLong();
@@ -647,6 +764,7 @@ public class NFA<T> implements Serializable {
                return ComputationState.createState(state, previousState, 
event, timestamp, version, startTimestamp);
        }
 
+
        /**
         * Generates a state name from a given name template and an index.
         * <p>

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index e6a8c75..d5b7876 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.nfa;
 
 import com.google.common.collect.LinkedHashMultimap;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -463,6 +464,54 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
                }
        }
 
+       private SharedBuffer(
+               TypeSerializer<V> valueSerializer,
+               Map<K, SharedBufferPage<K, V>> pages) {
+               this.valueSerializer = valueSerializer;
+               this.pages = pages;
+       }
+
+       /**
+        * For backward compatibility only. Previously the key in {@link 
SharedBuffer} was {@link State}.
+        * Now it is {@link String}.
+        */
+       @Internal
+       static <T> SharedBuffer<String, T> 
migrateSharedBuffer(SharedBuffer<State<T>, T> buffer) {
+
+               final Map<String, SharedBufferPage<String, T>> pageMap = new 
HashMap<>();
+               final Map<SharedBufferEntry<State<T>, T>, 
SharedBufferEntry<String, T>> entries = new HashMap<>();
+
+               for (Map.Entry<State<T>, SharedBufferPage<State<T>, T>> page : 
buffer.pages.entrySet()) {
+                       final SharedBufferPage<String, T> newPage = new 
SharedBufferPage<>(page.getKey().getName());
+                       pageMap.put(newPage.getKey(), newPage);
+
+                       for (Map.Entry<ValueTimeWrapper<T>, 
SharedBufferEntry<State<T>, T>> pageEntry : page.getValue().entries.entrySet()) 
{
+                               final SharedBufferEntry<String, T> 
newSharedBufferEntry = new SharedBufferEntry<>(
+                                       pageEntry.getKey(),
+                                       newPage);
+                               newSharedBufferEntry.referenceCounter = 
pageEntry.getValue().referenceCounter;
+                               entries.put(pageEntry.getValue(), 
newSharedBufferEntry);
+                               newPage.entries.put(pageEntry.getKey(), 
newSharedBufferEntry);
+                       }
+               }
+
+               for (Map.Entry<State<T>, SharedBufferPage<State<T>, T>> page : 
buffer.pages.entrySet()) {
+                       for (Map.Entry<ValueTimeWrapper<T>, 
SharedBufferEntry<State<T>, T>> pageEntry : page.getValue().entries.entrySet()) 
{
+                               final SharedBufferEntry<String, T> newEntry = 
entries.get(pageEntry.getValue());
+                               for (SharedBufferEdge<State<T>, T> edge : 
pageEntry.getValue().edges) {
+                                       final SharedBufferEntry<String, T> 
targetNewEntry = entries.get(edge.getTarget());
+
+                                       final SharedBufferEdge<String, T> 
newEdge = new SharedBufferEdge<>(
+                                               targetNewEntry,
+                                               edge.getVersion());
+                                       newEntry.edges.add(newEdge);
+                               }
+                       }
+               }
+
+               return new SharedBuffer<>(buffer.valueSerializer, pageMap);
+       }
+
        private SharedBufferEntry<K, V> get(
                final K key,
                final V value,

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 7bcb6ea..27e0dcd 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -20,9 +20,12 @@ package org.apache.flink.cep.nfa;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Objects;
 
 /**
@@ -62,7 +65,6 @@ public class State<T> implements Serializable {
                return stateTransitions;
        }
 
-
        private void addStateTransition(
                final StateTransitionAction action,
                final State<T> targetState,
@@ -132,4 +134,19 @@ public class State<T> implements Serializable {
                Final, // the state is a final state for the NFA
                Normal // the state is neither a start nor a final state
        }
+
+       private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
+               ois.defaultReadObject();
+
+               //Backward compatibility. Previous version of StateTransition 
did not have source state
+               if (!stateTransitions.isEmpty() && 
stateTransitions.iterator().next().getSourceState() == null) {
+                       final List<StateTransition<T>> tmp = new ArrayList<>();
+                       tmp.addAll(this.stateTransitions);
+
+                       this.stateTransitions.clear();
+                       for (StateTransition<T> transition : tmp) {
+                               addStateTransition(transition.getAction(), 
transition.getTargetState(), transition.getCondition());
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index b476c49..8bd8612 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -18,10 +18,15 @@
 
 package org.apache.flink.cep.nfa.compiler;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
+import org.apache.flink.cep.nfa.StateTransition;
+import org.apache.flink.cep.nfa.StateTransitionAction;
 import org.apache.flink.cep.pattern.FilterFunctions;
 import org.apache.flink.cep.pattern.FollowedByPattern;
 import org.apache.flink.cep.pattern.MalformedPatternException;
@@ -31,12 +36,15 @@ import org.apache.flink.cep.pattern.Quantifier;
 import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
+import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -363,6 +371,114 @@ public class NFACompiler {
        }
 
        /**
+        * Used for migrating CEP graphs prior to 1.3. It removes the dummy 
start, adds the dummy end, and translates all
+        * states to consuming ones by moving all TAKEs and IGNOREs to the next 
state. This method assumes each state
+        * has at most one TAKE and one IGNORE and name of each state is 
unique. No PROCEED transition is allowed!
+        *
+        * @param oldStartState dummy start state of old graph
+        * @param <T> type of events
+        * @return map of new states, where key is the name of a state and 
value is the state itself
+        */
+       @Internal
+       public static <T> Map<String, State<T>> migrateGraph(State<T> 
oldStartState) {
+               State<T> oldFirst = oldStartState;
+               State<T> oldSecond = 
oldStartState.getStateTransitions().iterator().next().getTargetState();
+
+               StateTransition<T> oldFirstToSecondTake = Iterators.find(
+                       oldFirst.getStateTransitions().iterator(),
+                       new Predicate<StateTransition<T>>() {
+                               @Override
+                               public boolean apply(@Nullable 
StateTransition<T> input) {
+                                       return input != null && 
input.getAction() == StateTransitionAction.TAKE;
+                               }
+
+                       });
+
+               StateTransition<T> oldFirstIgnore = Iterators.find(
+                       oldFirst.getStateTransitions().iterator(),
+                       new Predicate<StateTransition<T>>() {
+                               @Override
+                               public boolean apply(@Nullable 
StateTransition<T> input) {
+                                       return input != null && 
input.getAction() == StateTransitionAction.IGNORE;
+                               }
+
+                       }, null);
+
+               StateTransition<T> oldSecondToThirdTake = Iterators.find(
+                       oldSecond.getStateTransitions().iterator(),
+                       new Predicate<StateTransition<T>>() {
+                               @Override
+                               public boolean apply(@Nullable 
StateTransition<T> input) {
+                                       return input != null && 
input.getAction() == StateTransitionAction.TAKE;
+                               }
+
+                       }, null);
+
+               final Map<String, State<T>> convertedStates = new HashMap<>();
+               State<T> newSecond;
+               State<T> newFirst = new State<>(oldSecond.getName(), 
State.StateType.Start);
+               convertedStates.put(newFirst.getName(), newFirst);
+               while (oldSecondToThirdTake != null) {
+
+                       newSecond = new 
State<T>(oldSecondToThirdTake.getTargetState().getName(), 
State.StateType.Normal);
+                       convertedStates.put(newSecond.getName(), newSecond);
+                       newFirst.addTake(newSecond, 
oldFirstToSecondTake.getCondition());
+
+                       if (oldFirstIgnore != null) {
+                               
newFirst.addIgnore(oldFirstIgnore.getCondition());
+                       }
+
+                       oldFirst = oldSecond;
+
+                       oldFirstToSecondTake = Iterators.find(
+                               oldFirst.getStateTransitions().iterator(),
+                               new Predicate<StateTransition<T>>() {
+                                       @Override
+                                       public boolean apply(@Nullable 
StateTransition<T> input) {
+                                               return input != null && 
input.getAction() == StateTransitionAction.TAKE;
+                                       }
+
+                               });
+
+                       oldFirstIgnore = Iterators.find(
+                               oldFirst.getStateTransitions().iterator(),
+                               new Predicate<StateTransition<T>>() {
+                                       @Override
+                                       public boolean apply(@Nullable 
StateTransition<T> input) {
+                                               return input != null && 
input.getAction() == StateTransitionAction.IGNORE;
+                                       }
+
+                               }, null);
+
+                       oldSecond = oldSecondToThirdTake.getTargetState();
+
+                       oldSecondToThirdTake = Iterators.find(
+                               oldSecond.getStateTransitions().iterator(),
+                               new Predicate<StateTransition<T>>() {
+                                       @Override
+                                       public boolean apply(@Nullable 
StateTransition<T> input) {
+                                               return input != null && 
input.getAction() == StateTransitionAction.TAKE;
+                                       }
+
+                               }, null);
+
+                       newFirst = newSecond;
+               }
+
+               final State<T> endingState = new State<>(ENDING_STATE_NAME, 
State.StateType.Final);
+
+               newFirst.addTake(endingState, 
oldFirstToSecondTake.getCondition());
+
+               if (oldFirstIgnore != null) {
+                       newFirst.addIgnore(oldFirstIgnore.getCondition());
+               }
+
+               convertedStates.put(endingState.getName(), endingState);
+
+               return convertedStates;
+       }
+
+       /**
         * Factory interface for {@link NFA}.
         *
         * @param <T> Type of the input events which are processed by the NFA

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 825ba957..5b05f19 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -39,6 +39,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class NFAITCase extends TestLogger {
 
@@ -421,7 +422,7 @@ public class NFAITCase extends TestLogger {
        }
 
        @Test
-       public void testComplexBranchingAfterKleeneStar() {
+       public void testComplexBranchingAfterZeroOrMore() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
                Event startEvent = new Event(40, "c", 1.0);
@@ -519,7 +520,7 @@ public class NFAITCase extends TestLogger {
        }
 
        @Test
-       public void testKleeneStar() {
+       public void testZeroOrMore() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
                Event startEvent = new Event(40, "c", 1.0);
@@ -581,7 +582,7 @@ public class NFAITCase extends TestLogger {
        }
 
        @Test
-       public void testEagerKleeneStar() {
+       public void testEagerZeroOrMore() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
                Event startEvent = new Event(40, "c", 1.0);
@@ -646,7 +647,7 @@ public class NFAITCase extends TestLogger {
 
 
        @Test
-       public void testBeginWithKleeneStar() {
+       public void testBeginWithZeroOrMore() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
                Event middleEvent1 = new Event(40, "a", 2.0);
@@ -704,7 +705,7 @@ public class NFAITCase extends TestLogger {
        }
 
        @Test
-       public void testKleeneStarAfterKleeneStar() {
+       public void testZeroOrMoreAfterZeroOrMore() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
                Event startEvent = new Event(40, "c", 1.0);
@@ -779,7 +780,7 @@ public class NFAITCase extends TestLogger {
        }
 
        @Test
-       public void testKleeneStarAfterBranching() {
+       public void testZeroOrMoreAfterBranching() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
                Event startEvent = new Event(40, "c", 1.0);
@@ -865,7 +866,7 @@ public class NFAITCase extends TestLogger {
        }
 
        @Test
-       public void testStrictContinuityNoResultsAfterKleeneStar() {
+       public void testStrictContinuityNoResultsAfterZeroOrMore() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
                Event start = new Event(40, "d", 2.0);
@@ -923,7 +924,7 @@ public class NFAITCase extends TestLogger {
        }
 
        @Test
-       public void testStrictContinuityResultsAfterKleeneStar() {
+       public void testStrictContinuityResultsAfterZeroOrMore() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
                Event start = new Event(40, "d", 2.0);
@@ -1663,4 +1664,181 @@ public class NFAITCase extends TestLogger {
                ), resultingPatterns);
        }
 
+       /**
+        * Clearing SharedBuffer
+        */
+
+       @Test
+       public void testTimesClearingBuffer() {
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 3.0);
+               Event middleEvent3 = new Event(43, "a", 4.0);
+               Event end1 = new Event(44, "b", 5.0);
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).next("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).times(2).followedBy("end1").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               }).within(Time.milliseconds(8));
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               nfa.process(startEvent, 1);
+               nfa.process(middleEvent1, 2);
+               nfa.process(middleEvent2, 3);
+               nfa.process(middleEvent3, 4);
+               nfa.process(end1, 6);
+
+               //pruning element
+               nfa.process(null, 10);
+
+               assertEquals(true, nfa.isEmpty());
+       }
+
+       @Test
+       public void testOptionalClearingBuffer() {
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent = new Event(43, "a", 4.0);
+               Event end1 = new Event(44, "b", 5.0);
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).optional().followedBy("end1").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               }).within(Time.milliseconds(8));
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               nfa.process(startEvent, 1);
+               nfa.process(middleEvent, 5);
+               nfa.process(end1, 6);
+
+               //pruning element
+               nfa.process(null, 10);
+
+               assertEquals(true, nfa.isEmpty());
+       }
+
+       @Test
+       public void testAtLeastOneClearingBuffer() {
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 3.0);
+               Event end1 = new Event(44, "b", 5.0);
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore(false).followedBy("end1").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               }).within(Time.milliseconds(8));
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               nfa.process(startEvent, 1);
+               nfa.process(middleEvent1, 3);
+               nfa.process(middleEvent2, 4);
+               nfa.process(end1, 6);
+
+               //pruning element
+               nfa.process(null, 10);
+
+               assertEquals(true, nfa.isEmpty());
+       }
+
+
+       @Test
+       public void testZeroOrMoreClearingBuffer() {
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 3.0);
+               Event end1 = new Event(44, "b", 5.0);
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).zeroOrMore(false).followedBy("end1").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               }).within(Time.milliseconds(8));
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               nfa.process(startEvent, 1);
+               nfa.process(middleEvent1, 3);
+               nfa.process(middleEvent2, 4);
+               nfa.process(end1, 6);
+
+               //pruning element
+               nfa.process(null, 10);
+
+               assertEquals(true, nfa.isEmpty());
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
new file mode 100644
index 0000000..65fa733
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.ObjectInputStream;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CEPMigration12to13Test {
+
+       private static String getResourceFilename(String filename) {
+               ClassLoader cl = CEPMigration12to13Test.class.getClassLoader();
+               URL resource = cl.getResource(filename);
+               if (resource == null) {
+                       throw new NullPointerException("Missing snapshot 
resource.");
+               }
+               return resource.getFile();
+       }
+
+       @Test
+       public void testMigrationAfterBranchingPattern() throws Exception {
+
+               KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
+                       private static final long serialVersionUID = 
-4873366487571254798L;
+
+                       @Override
+                       public Integer getKey(Event value) throws Exception {
+                               return value.getId();
+                       }
+               };
+
+               final Event startEvent = new Event(42, "start", 1.0);
+               final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 
10.0);
+               final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 
10.0);
+               final Event endEvent = new Event(42, "end", 1.0);
+
+               // uncomment these lines for regenerating the snapshot on Flink 
1.2
+//             OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+//                     new KeyedOneInputStreamOperatorTestHarness<>(
+//                             new KeyedCEPPatternOperator<>(
+//                                     Event.createTypeSerializer(),
+//                                     false,
+//                                     keySelector,
+//                                     IntSerializer.INSTANCE,
+//                                     new NFAFactory(),
+//                                     true),
+//                             keySelector,
+//                             BasicTypeInfo.INT_TYPE_INFO);
+//
+//             harness.setup();
+//             harness.open();
+//             harness.processElement(new StreamRecord<Event>(startEvent, 1));
+//             harness.processElement(new StreamRecord<Event>(new Event(42, 
"foobar", 1.0), 2));
+//             harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3));
+//             harness.processElement(new StreamRecord<Event>(middleEvent1, 
2));
+//             harness.processElement(new StreamRecord<Event>(middleEvent2, 
3));
+//             harness.processWatermark(new Watermark(5));
+//             // simulate snapshot/restore with empty element queue but NFA 
state
+//             OperatorStateHandles snapshot = harness.snapshot(1, 1);
+//             FileOutputStream out = new FileOutputStream(
+//                             
"src/test/resources/cep-branching-snapshot-1.2");
+//             ObjectOutputStream oos = new ObjectOutputStream(out);
+//             oos.writeObject(snapshot.getOperatorChainIndex());
+//             oos.writeObject(snapshot.getLegacyOperatorState());
+//             oos.writeObject(snapshot.getManagedKeyedState());
+//             oos.writeObject(snapshot.getRawKeyedState());
+//             oos.writeObject(snapshot.getManagedOperatorState());
+//             oos.writeObject(snapshot.getRawOperatorState());
+//             out.close();
+//             harness.close();
+
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(
+                               new KeyedCEPPatternOperator<>(
+                                       Event.createTypeSerializer(),
+                                       false,
+                                       keySelector,
+                                       IntSerializer.INSTANCE,
+                                       new NFAFactory(),
+                                       true),
+                               keySelector,
+                               BasicTypeInfo.INT_TYPE_INFO);
+
+               harness.setup();
+               final ObjectInputStream ois = new ObjectInputStream(new 
FileInputStream(getResourceFilename(
+                       "cep-branching-snapshot-1.2")));
+               final OperatorStateHandles snapshot = new OperatorStateHandles(
+                       (int) ois.readObject(),
+                       (StreamStateHandle) ois.readObject(),
+                       (Collection<KeyGroupsStateHandle>) ois.readObject(),
+                       (Collection<KeyGroupsStateHandle>) ois.readObject(),
+                       (Collection<OperatorStateHandle>) ois.readObject(),
+                       (Collection<OperatorStateHandle>) ois.readObject()
+               );
+               harness.initializeState(snapshot);
+               harness.open();
+
+               harness.processElement(new StreamRecord<>(new Event(42, 
"start", 1.0), 4));
+               harness.processElement(new StreamRecord<>(endEvent, 5));
+
+               harness.processWatermark(new Watermark(20));
+
+               ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+               // watermark and 2 results
+               assertEquals(3, result.size());
+
+               Object resultObject1 = result.poll();
+               assertTrue(resultObject1 instanceof StreamRecord);
+               StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+               assertTrue(resultRecord1.getValue() instanceof Map);
+
+               Object resultObject2 = result.poll();
+               assertTrue(resultObject2 instanceof StreamRecord);
+               StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+               assertTrue(resultRecord2.getValue() instanceof Map);
+
+               @SuppressWarnings("unchecked")
+               Map<String, Event> patternMap1 = (Map<String, Event>) 
resultRecord1.getValue();
+
+               assertEquals(startEvent, patternMap1.get("start"));
+               assertEquals(middleEvent1, patternMap1.get("middle"));
+               assertEquals(endEvent, patternMap1.get("end"));
+
+               @SuppressWarnings("unchecked")
+               Map<String, Event> patternMap2 = (Map<String, Event>) 
resultRecord2.getValue();
+
+               assertEquals(startEvent, patternMap2.get("start"));
+               assertEquals(middleEvent2, patternMap2.get("middle"));
+               assertEquals(endEvent, patternMap2.get("end"));
+
+               harness.close();
+       }
+
+       @Test
+       public void testStartingNewPatternAfterMigration() throws Exception {
+
+               KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
+                       private static final long serialVersionUID = 
-4873366487571254798L;
+
+                       @Override
+                       public Integer getKey(Event value) throws Exception {
+                               return value.getId();
+                       }
+               };
+
+               final Event startEvent1 = new Event(42, "start", 1.0);
+               final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 
10.0);
+               final Event startEvent2 = new Event(42, "start", 5.0);
+               final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 
10.0);
+               final Event endEvent = new Event(42, "end", 1.0);
+
+               // uncomment these lines for regenerating the snapshot on Flink 
1.2
+//             OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+//                     new KeyedOneInputStreamOperatorTestHarness<>(
+//                             new KeyedCEPPatternOperator<>(
+//                                     Event.createTypeSerializer(),
+//                                     false,
+//                                     keySelector,
+//                                     IntSerializer.INSTANCE,
+//                                     new NFAFactory(),
+//                                     true),
+//                             keySelector,
+//                             BasicTypeInfo.INT_TYPE_INFO);
+//
+//             harness.setup();
+//             harness.open();
+//             harness.processElement(new StreamRecord<Event>(startEvent1, 1));
+//             harness.processElement(new StreamRecord<Event>(new Event(42, 
"foobar", 1.0), 2));
+//             harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3));
+//             harness.processElement(new StreamRecord<Event>(middleEvent1, 
2));
+//             harness.processWatermark(new Watermark(5));
+//             // simulate snapshot/restore with empty element queue but NFA 
state
+//             OperatorStateHandles snapshot = harness.snapshot(1, 1);
+//             FileOutputStream out = new FileOutputStream(
+//                             "src/test/resources/cep-starting-snapshot-1.2");
+//             ObjectOutputStream oos = new ObjectOutputStream(out);
+//             oos.writeObject(snapshot.getOperatorChainIndex());
+//             oos.writeObject(snapshot.getLegacyOperatorState());
+//             oos.writeObject(snapshot.getManagedKeyedState());
+//             oos.writeObject(snapshot.getRawKeyedState());
+//             oos.writeObject(snapshot.getManagedOperatorState());
+//             oos.writeObject(snapshot.getRawOperatorState());
+//             out.close();
+//             harness.close();
+
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(
+                               new KeyedCEPPatternOperator<>(
+                                       Event.createTypeSerializer(),
+                                       false,
+                                       keySelector,
+                                       IntSerializer.INSTANCE,
+                                       new NFAFactory(),
+                                       true),
+                               keySelector,
+                               BasicTypeInfo.INT_TYPE_INFO);
+
+               harness.setup();
+               final ObjectInputStream ois = new ObjectInputStream(new 
FileInputStream(getResourceFilename(
+                       "cep-starting-snapshot-1.2")));
+               final OperatorStateHandles snapshot = new OperatorStateHandles(
+                       (int) ois.readObject(),
+                       (StreamStateHandle) ois.readObject(),
+                       (Collection<KeyGroupsStateHandle>) ois.readObject(),
+                       (Collection<KeyGroupsStateHandle>) ois.readObject(),
+                       (Collection<OperatorStateHandle>) ois.readObject(),
+                       (Collection<OperatorStateHandle>) ois.readObject()
+               );
+               harness.initializeState(snapshot);
+               harness.open();
+
+               harness.processElement(new StreamRecord<>(startEvent2, 5));
+               harness.processElement(new StreamRecord<Event>(middleEvent2, 
6));
+               harness.processElement(new StreamRecord<>(endEvent, 7));
+
+               harness.processWatermark(new Watermark(20));
+
+               ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+               // watermark and 3 results
+               assertEquals(4, result.size());
+
+               Object resultObject1 = result.poll();
+               assertTrue(resultObject1 instanceof StreamRecord);
+               StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+               assertTrue(resultRecord1.getValue() instanceof Map);
+
+               Object resultObject2 = result.poll();
+               assertTrue(resultObject2 instanceof StreamRecord);
+               StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+               assertTrue(resultRecord2.getValue() instanceof Map);
+
+               Object resultObject3 = result.poll();
+               assertTrue(resultObject3 instanceof StreamRecord);
+               StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
+               assertTrue(resultRecord3.getValue() instanceof Map);
+
+               @SuppressWarnings("unchecked")
+               Map<String, Event> patternMap1 = (Map<String, Event>) 
resultRecord1.getValue();
+
+               assertEquals(startEvent1, patternMap1.get("start"));
+               assertEquals(middleEvent1, patternMap1.get("middle"));
+               assertEquals(endEvent, patternMap1.get("end"));
+
+               @SuppressWarnings("unchecked")
+               Map<String, Event> patternMap2 = (Map<String, Event>) 
resultRecord2.getValue();
+
+               assertEquals(startEvent1, patternMap2.get("start"));
+               assertEquals(middleEvent2, patternMap2.get("middle"));
+               assertEquals(endEvent, patternMap2.get("end"));
+
+               @SuppressWarnings("unchecked")
+               Map<String, Event> patternMap3 = (Map<String, Event>) 
resultRecord3.getValue();
+
+               assertEquals(startEvent2, patternMap3.get("start"));
+               assertEquals(middleEvent2, patternMap3.get("middle"));
+               assertEquals(endEvent, patternMap3.get("end"));
+
+               harness.close();
+       }
+
+       @Test
+       public void testSinglePatternAfterMigration() throws Exception {
+
+               KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
+                       private static final long serialVersionUID = 
-4873366487571254798L;
+
+                       @Override
+                       public Integer getKey(Event value) throws Exception {
+                               return value.getId();
+                       }
+               };
+
+               final Event startEvent1 = new Event(42, "start", 1.0);
+
+               // uncomment these lines for regenerating the snapshot on Flink 
1.2
+//             OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+//                     new KeyedOneInputStreamOperatorTestHarness<>(
+//                             new KeyedCEPPatternOperator<>(
+//                                     Event.createTypeSerializer(),
+//                                     false,
+//                                     keySelector,
+//                                     IntSerializer.INSTANCE,
+//                                     new SinglePatternNFAFactory(),
+//                                     true),
+//                             keySelector,
+//                             BasicTypeInfo.INT_TYPE_INFO);
+//
+//             harness.setup();
+//             harness.open();
+//             harness.processWatermark(new Watermark(5));
+//             // simulate snapshot/restore with empty element queue but NFA 
state
+//             OperatorStateHandles snapshot = harness.snapshot(1, 1);
+//             FileOutputStream out = new FileOutputStream(
+//                             
"src/test/resources/cep-single-pattern-snapshot-1.2");
+//             ObjectOutputStream oos = new ObjectOutputStream(out);
+//             oos.writeObject(snapshot.getOperatorChainIndex());
+//             oos.writeObject(snapshot.getLegacyOperatorState());
+//             oos.writeObject(snapshot.getManagedKeyedState());
+//             oos.writeObject(snapshot.getRawKeyedState());
+//             oos.writeObject(snapshot.getManagedOperatorState());
+//             oos.writeObject(snapshot.getRawOperatorState());
+//             out.close();
+//             harness.close();
+
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(
+                               new KeyedCEPPatternOperator<>(
+                                       Event.createTypeSerializer(),
+                                       false,
+                                       keySelector,
+                                       IntSerializer.INSTANCE,
+                                       new SinglePatternNFAFactory(),
+                                       true),
+                               keySelector,
+                               BasicTypeInfo.INT_TYPE_INFO);
+
+               harness.setup();
+               final ObjectInputStream ois = new ObjectInputStream(new 
FileInputStream(getResourceFilename(
+                       "cep-single-pattern-snapshot-1.2")));
+               final OperatorStateHandles snapshot = new OperatorStateHandles(
+                       (int) ois.readObject(),
+                       (StreamStateHandle) ois.readObject(),
+                       (Collection<KeyGroupsStateHandle>) ois.readObject(),
+                       (Collection<KeyGroupsStateHandle>) ois.readObject(),
+                       (Collection<OperatorStateHandle>) ois.readObject(),
+                       (Collection<OperatorStateHandle>) ois.readObject()
+               );
+               harness.initializeState(snapshot);
+               harness.open();
+
+               harness.processElement(new StreamRecord<>(startEvent1, 5));
+
+               harness.processWatermark(new Watermark(20));
+
+               ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+               // watermark and the result
+               assertEquals(2, result.size());
+
+               Object resultObject = result.poll();
+               assertTrue(resultObject instanceof StreamRecord);
+               StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+               assertTrue(resultRecord.getValue() instanceof Map);
+
+               @SuppressWarnings("unchecked")
+               Map<String, Event> patternMap = (Map<String, Event>) 
resultRecord.getValue();
+
+               assertEquals(startEvent1, patternMap.get("start"));
+
+               harness.close();
+       }
+
+       private static class SinglePatternNFAFactory implements 
NFACompiler.NFAFactory<Event> {
+
+               private static final long serialVersionUID = 
1173020762472766713L;
+
+               private final boolean handleTimeout;
+
+               private SinglePatternNFAFactory() {
+                       this(false);
+               }
+
+               private SinglePatternNFAFactory(boolean handleTimeout) {
+                       this.handleTimeout = handleTimeout;
+               }
+
+               @Override
+               public NFA<Event> createNFA() {
+
+                       Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new StartFilter())
+                               .within(Time.milliseconds(10L));
+
+                       return NFACompiler.compile(pattern, 
Event.createTypeSerializer(), handleTimeout);
+               }
+       }
+
+       private static class NFAFactory implements 
NFACompiler.NFAFactory<Event> {
+
+               private static final long serialVersionUID = 
1173020762472766713L;
+
+               private final boolean handleTimeout;
+
+               private NFAFactory() {
+                       this(false);
+               }
+
+               private NFAFactory(boolean handleTimeout) {
+                       this.handleTimeout = handleTimeout;
+               }
+
+               @Override
+               public NFA<Event> createNFA() {
+
+                       Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new StartFilter())
+                               .followedBy("middle")
+                               .subtype(SubEvent.class)
+                               .where(new MiddleFilter())
+                               .followedBy("end")
+                               .where(new EndFilter())
+                               // add a window timeout to test whether 
timestamps of elements in the
+                               // priority queue in CEP operator are correctly 
checkpointed/restored
+                               .within(Time.milliseconds(10L));
+
+                       return NFACompiler.compile(pattern, 
Event.createTypeSerializer(), handleTimeout);
+               }
+       }
+
+       private static class StartFilter implements FilterFunction<Event> {
+               private static final long serialVersionUID = 
5726188262756267490L;
+
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("start");
+               }
+       }
+
+       private static class MiddleFilter implements FilterFunction<SubEvent> {
+               private static final long serialVersionUID = 
6215754202506583964L;
+
+               @Override
+               public boolean filter(SubEvent value) throws Exception {
+                       return value.getVolume() > 5.0;
+               }
+       }
+
+       private static class EndFilter implements FilterFunction<Event> {
+               private static final long serialVersionUID = 
7056763917392056548L;
+
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("end");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 
b/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2
new file mode 100644
index 0000000..47f710e
Binary files /dev/null and 
b/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 
b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2
new file mode 100644
index 0000000..255f46a
Binary files /dev/null and 
b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 
b/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2
new file mode 100644
index 0000000..c41f6c2
Binary files /dev/null and 
b/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 differ

Reply via email to