[FLINK-6604] [cep] Remove java serialization from the library.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d80af819
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d80af819
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d80af819

Branch: refs/heads/release-1.3
Commit: d80af81972ba5adf291d891881aee26b97ec7a60
Parents: 34a6020
Author: kkloudas <kklou...@gmail.com>
Authored: Tue May 16 17:07:29 2017 +0200
Committer: kkloudas <kklou...@gmail.com>
Committed: Wed May 17 14:40:27 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/cep/nfa/DeweyNumber.java   |  98 ++-
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 517 ++++++++++++++--
 .../org/apache/flink/cep/nfa/SharedBuffer.java  | 596 ++++++++++++++-----
 .../java/org/apache/flink/cep/nfa/State.java    |   2 +
 .../flink/cep/nfa/compiler/NFACompiler.java     |  81 ++-
 .../AbstractKeyedCEPPatternOperator.java        |  16 +-
 .../java/org/apache/flink/cep/nfa/NFATest.java  | 182 ++++--
 .../apache/flink/cep/nfa/SharedBufferTest.java  |  14 +-
 .../cep/operator/CEPFrom12MigrationTest.java    |  99 ++-
 .../cep/operator/CEPMigration11to13Test.java    | 102 +++-
 .../flink/cep/operator/CEPOperatorTest.java     | 110 +++-
 11 files changed, 1499 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index fd3fafa..3827956 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -18,6 +18,13 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 
@@ -40,14 +47,14 @@ public class DeweyNumber implements Serializable {
                deweyNumber = new int[]{start};
        }
 
-       protected DeweyNumber(int[] deweyNumber) {
-               this.deweyNumber = deweyNumber;
-       }
-
        public DeweyNumber(DeweyNumber number) {
                this.deweyNumber = Arrays.copyOf(number.deweyNumber, 
number.deweyNumber.length);
        }
 
+       private DeweyNumber(int[] deweyNumber) {
+               this.deweyNumber = deweyNumber;
+       }
+
        /**
         * Checks whether this dewey number is compatible to the other dewey 
number.
         *
@@ -175,4 +182,87 @@ public class DeweyNumber implements Serializable {
                        return new DeweyNumber(deweyNumber);
                }
        }
+
+       /**
+        * A {@link TypeSerializer} for the {@link DeweyNumber} which serves as 
a version number.
+        */
+       public static class DeweyNumberSerializer extends 
TypeSerializerSingleton<DeweyNumber> {
+
+               private static final long serialVersionUID = 
-5086792497034943656L;
+
+               private final IntSerializer elemSerializer = 
IntSerializer.INSTANCE;
+
+               @Override
+               public boolean isImmutableType() {
+                       return false;
+               }
+
+               @Override
+               public DeweyNumber createInstance() {
+                       return new DeweyNumber(1);
+               }
+
+               @Override
+               public DeweyNumber copy(DeweyNumber from) {
+                       return new DeweyNumber(from);
+               }
+
+               @Override
+               public DeweyNumber copy(DeweyNumber from, DeweyNumber reuse) {
+                       return copy(from);
+               }
+
+               @Override
+               public int getLength() {
+                       return -1;
+               }
+
+               @Override
+               public void serialize(DeweyNumber record, DataOutputView 
target) throws IOException {
+                       final int size = record.length();
+                       target.writeInt(size);
+                       for (int i = 0; i < size; i++) {
+                               elemSerializer.serialize(record.deweyNumber[i], 
target);
+                       }
+               }
+
+               @Override
+               public DeweyNumber deserialize(DataInputView source) throws 
IOException {
+                       final int size = source.readInt();
+                       int[] number = new int[size];
+                       for (int i = 0; i < size; i++) {
+                               number[i] = elemSerializer.deserialize(source);
+                       }
+                       return new DeweyNumber(number);
+               }
+
+               @Override
+               public DeweyNumber deserialize(DeweyNumber reuse, DataInputView 
source) throws IOException {
+                       return deserialize(source);
+               }
+
+               @Override
+               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
+                       final int size = source.readInt();
+                       target.writeInt(size);
+                       for (int i = 0; i < size; i++) {
+                               elemSerializer.copy(source, target);
+                       }
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       return obj == this || obj.getClass().equals(getClass());
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return true;
+               }
+
+               @Override
+               public int hashCode() {
+                       return elemSerializer.hashCode();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index f2ade9e..ab5cd8e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -21,11 +21,17 @@ package org.apache.flink.cep.nfa;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.ListMultimap;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.EnumSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
@@ -86,19 +92,35 @@ public class NFA<T> implements Serializable {
 
        private static final long serialVersionUID = 2957674889294717265L;
 
-       private final NonDuplicatingTypeSerializer<T> 
nonDuplicatingTypeSerializer;
+       /////////////////////                   Backwards Compatibility Fields  
                /////////////////////
 
        /**
-        * Used only for backwards compatibility. Buffer used to store the 
matched events.
+        * @deprecated Used only for backwards compatibility.
+        * Look at the {@link #eventSharedBuffer} for its replacement.
         */
+       @Deprecated
        private final SharedBuffer<State<T>, T> sharedBuffer = null;
 
        /**
+        * @deprecated Used only for backward compatibility.
+        */
+       @Deprecated
+       private int startEventCounter;
+
+       /**
+        * @deprecated Used only for backwards compatibility.
+        */
+       @Deprecated
+       private final NonDuplicatingTypeSerializer<T> 
nonDuplicatingTypeSerializer;
+
+       //////////////////                      End of Backwards Compatibility 
Fields                   //////////////////
+
+       /**
         * A set of all the valid NFA states, as returned by the
         * {@link NFACompiler NFACompiler}.
         * These are directly derived from the user-specified pattern.
         */
-       private final Set<State<T>> states;
+       private Set<State<T>> states;
 
        /**
         * The length of a windowed pattern, as specified using the
@@ -114,11 +136,6 @@ public class NFA<T> implements Serializable {
        private final boolean handleTimeout;
 
        /**
-        * Used only for backward compatibility.
-        */
-       private int startEventCounter;
-
-       /**
         * Current set of {@link ComputationState computation states} within 
the state machine.
         * These are the "active" intermediate states that are waiting for new 
matching
         * events to transition to new valid states.
@@ -126,19 +143,22 @@ public class NFA<T> implements Serializable {
        private transient Queue<ComputationState<T>> computationStates;
 
        /**
-        *      Buffer used to store the matched events.
+        * Buffer used to store the matched events.
         */
-       private final SharedBuffer<String, T> stringSharedBuffer;
+       private SharedBuffer<String, T> eventSharedBuffer;
+
+       private TypeSerializer<T> eventSerializer;
 
        public NFA(
                        final TypeSerializer<T> eventSerializer,
                        final long windowTime,
                        final boolean handleTimeout) {
 
+               this.eventSerializer = eventSerializer;
                this.nonDuplicatingTypeSerializer = new 
NonDuplicatingTypeSerializer<>(eventSerializer);
                this.windowTime = windowTime;
                this.handleTimeout = handleTimeout;
-               this.stringSharedBuffer = new 
SharedBuffer<>(nonDuplicatingTypeSerializer);
+               this.eventSharedBuffer = new 
SharedBuffer<>(nonDuplicatingTypeSerializer);
                this.computationStates = new LinkedList<>();
                this.states = new HashSet<>();
        }
@@ -169,7 +189,7 @@ public class NFA<T> implements Serializable {
         * {@code false} otherwise.
         */
        public boolean isEmpty() {
-               return stringSharedBuffer.isEmpty();
+               return eventSharedBuffer.isEmpty();
        }
 
        /**
@@ -207,7 +227,7 @@ public class NFA<T> implements Serializable {
                                        
timeoutResult.add(Tuple2.of(timedoutPattern, timestamp));
                                }
 
-                               stringSharedBuffer.release(
+                               eventSharedBuffer.release(
                                                
computationState.getPreviousState().getName(),
                                                computationState.getEvent(),
                                                computationState.getTimestamp(),
@@ -231,7 +251,7 @@ public class NFA<T> implements Serializable {
                                        result.add(matchedPattern);
 
                                        // remove found patterns because they 
are no longer needed
-                                       stringSharedBuffer.release(
+                                       eventSharedBuffer.release(
                                                        
newComputationState.getPreviousState().getName(),
                                                        
newComputationState.getEvent(),
                                                        
newComputationState.getTimestamp(),
@@ -239,7 +259,7 @@ public class NFA<T> implements Serializable {
                                } else if (newComputationState.isStopState()) {
                                        //reached stop state. release entry for 
the stop state
                                        shouldDiscardPath = true;
-                                       stringSharedBuffer.release(
+                                       eventSharedBuffer.release(
                                                
newComputationState.getPreviousState().getName(),
                                                newComputationState.getEvent(),
                                                
newComputationState.getTimestamp(),
@@ -254,7 +274,7 @@ public class NFA<T> implements Serializable {
                                // a stop state was reached in this branch. 
release branch which results in removing previous event from
                                // the buffer
                                for (final ComputationState<T> state : 
statesToRetain) {
-                                       stringSharedBuffer.release(
+                                       eventSharedBuffer.release(
                                                
state.getPreviousState().getName(),
                                                state.getEvent(),
                                                state.getTimestamp(),
@@ -275,7 +295,7 @@ public class NFA<T> implements Serializable {
 
                                // remove all elements which are expired
                                // with respect to the window length
-                               stringSharedBuffer.prune(pruningTimestamp);
+                               eventSharedBuffer.prune(pruningTimestamp);
                        }
                }
 
@@ -289,7 +309,7 @@ public class NFA<T> implements Serializable {
                        NFA<T> other = (NFA<T>) obj;
 
                        return 
nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
-                               
stringSharedBuffer.equals(other.stringSharedBuffer) &&
+                               
eventSharedBuffer.equals(other.eventSharedBuffer) &&
                                states.equals(other.states) &&
                                windowTime == other.windowTime;
                } else {
@@ -299,7 +319,7 @@ public class NFA<T> implements Serializable {
 
        @Override
        public int hashCode() {
-               return Objects.hash(nonDuplicatingTypeSerializer, 
stringSharedBuffer, states, windowTime);
+               return Objects.hash(nonDuplicatingTypeSerializer, 
eventSharedBuffer, states, windowTime);
        }
 
        private static <T> boolean isEquivalentState(final State<T> s1, final 
State<T> s2) {
@@ -445,14 +465,14 @@ public class NFA<T> implements Serializable {
                                        final long startTimestamp;
                                        if (computationState.isStartState()) {
                                                startTimestamp = timestamp;
-                                               counter = 
stringSharedBuffer.put(
+                                               counter = eventSharedBuffer.put(
                                                        currentState.getName(),
                                                        event,
                                                        timestamp,
                                                        currentVersion);
                                        } else {
                                                startTimestamp = 
computationState.getStartTimestamp();
-                                               counter = 
stringSharedBuffer.put(
+                                               counter = eventSharedBuffer.put(
                                                        currentState.getName(),
                                                        event,
                                                        timestamp,
@@ -502,7 +522,7 @@ public class NFA<T> implements Serializable {
 
                if (computationState.getEvent() != null) {
                        // release the shared entry referenced by the current 
computation state.
-                       stringSharedBuffer.release(
+                       eventSharedBuffer.release(
                                computationState.getPreviousState().getName(),
                                computationState.getEvent(),
                                computationState.getTimestamp(),
@@ -524,7 +544,7 @@ public class NFA<T> implements Serializable {
                ComputationState<T> computationState = 
ComputationState.createState(
                                this, currentState, previousState, event, 
counter, timestamp, version, startTimestamp);
                computationStates.add(computationState);
-               stringSharedBuffer.lock(previousState.getName(), event, 
timestamp, counter);
+               eventSharedBuffer.lock(previousState.getName(), event, 
timestamp, counter);
        }
 
        private State<T> findFinalStateAfterProceed(State<T> state, T event, 
ComputationState<T> computationState) {
@@ -609,7 +629,12 @@ public class NFA<T> implements Serializable {
                        return new HashMap<>();
                }
 
-               Collection<ListMultimap<String, T>> paths = 
stringSharedBuffer.extractPatterns(
+               // the following is used when migrating from previous versions.
+               if (eventSerializer == null) {
+                       eventSerializer = 
nonDuplicatingTypeSerializer.getTypeSerializer();
+               }
+
+               Collection<ListMultimap<String, T>> paths = 
eventSharedBuffer.extractPatterns(
                                computationState.getPreviousState().getName(),
                                computationState.getEvent(),
                                computationState.getTimestamp(),
@@ -619,19 +644,22 @@ public class NFA<T> implements Serializable {
                // for a given computation state, we cannot have more than one 
matching patterns.
                Preconditions.checkState(paths.size() <= 1);
 
-               TypeSerializer<T> serializer = 
nonDuplicatingTypeSerializer.getTypeSerializer();
-
                Map<String, List<T>> result = new HashMap<>();
                for (ListMultimap<String, T> path: paths) {
                        for (String key: path.keySet()) {
                                List<T> events = path.get(key);
 
-                               List<T> values = new ArrayList<>(events.size());
+                               String originalKey = 
NFACompiler.getOriginalStateNameFromInternal(key);
+                               List<T> values = result.get(originalKey);
+                               if (values == null) {
+                                       values = new ArrayList<>(events.size());
+                               }
+
                                for (T event: events) {
                                        // copy the element so that the user 
can change it
-                                       values.add(serializer.isImmutableType() 
? event : serializer.copy(event));
+                                       
values.add(eventSerializer.isImmutableType() ? event : 
eventSerializer.copy(event));
                                }
-                               result.put(key, values);
+                               result.put(originalKey, values);
                        }
                }
                return result;
@@ -639,18 +667,6 @@ public class NFA<T> implements Serializable {
 
        //////////////////////                  Fault-Tolerance / Migration     
                //////////////////////
 
-       private void writeObject(ObjectOutputStream oos) throws IOException {
-               oos.defaultWriteObject();
-
-               oos.writeInt(computationStates.size());
-
-               for(ComputationState<T> computationState: computationStates) {
-                       writeComputationState(computationState, oos);
-               }
-
-               nonDuplicatingTypeSerializer.clearReferences();
-       }
-
        private final static String BEGINNING_STATE_NAME = "$beginningState$";
 
        private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
@@ -676,7 +692,7 @@ public class NFA<T> implements Serializable {
                        try {
                                //Backwards compatibility
                                
this.computationStates.addAll(migrateNFA(readComputationStates));
-                               final Field newSharedBufferField = 
NFA.class.getDeclaredField("stringSharedBuffer");
+                               final Field newSharedBufferField = 
NFA.class.getDeclaredField("eventSharedBuffer");
                                final Field sharedBufferField = 
NFA.class.getDeclaredField("sharedBuffer");
                                sharedBufferField.setAccessible(true);
                                newSharedBufferField.setAccessible(true);
@@ -760,24 +776,6 @@ public class NFA<T> implements Serializable {
                return computationStates;
        }
 
-       private void writeComputationState(final ComputationState<T> 
computationState, final ObjectOutputStream oos) throws IOException {
-               oos.writeObject(computationState.getState());
-               oos.writeObject(computationState.getPreviousState());
-               oos.writeLong(computationState.getTimestamp());
-               oos.writeObject(computationState.getVersion());
-               oos.writeLong(computationState.getStartTimestamp());
-
-               if (computationState.getEvent() == null) {
-                       // write that we don't have an event associated
-                       oos.writeBoolean(false);
-               } else {
-                       // write that we have an event associated
-                       oos.writeBoolean(true);
-                       DataOutputViewStreamWrapper output = new 
DataOutputViewStreamWrapper(oos);
-                       
nonDuplicatingTypeSerializer.serialize(computationState.getEvent(), output);
-               }
-       }
-
        @SuppressWarnings("unchecked")
        private ComputationState<T> readComputationState(ObjectInputStream ois) 
throws IOException, ClassNotFoundException {
                final State<T> state = (State<T>)ois.readObject();
@@ -805,7 +803,397 @@ public class NFA<T> implements Serializable {
                return ComputationState.createState(this, state, previousState, 
event, 0, timestamp, version, startTimestamp);
        }
 
-       //////////////////////                  Serialization                   
//////////////////////
+       //////////////////////                  New Serialization               
        //////////////////////
+
+       /**
+        * The {@link TypeSerializerConfigSnapshot} serializer configuration to 
be stored with the managed state.
+        */
+       public static final class NFASerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+
+               private static final int VERSION = 1;
+
+               /** This empty constructor is required for deserializing the 
configuration. */
+               public NFASerializerConfigSnapshot() {}
+
+               public NFASerializerConfigSnapshot(
+                               TypeSerializerConfigSnapshot 
sharedBufferSerializerConfigSnapshot,
+                               TypeSerializerConfigSnapshot 
eventSerializerConfigSnapshot) {
+
+                       super(sharedBufferSerializerConfigSnapshot, 
eventSerializerConfigSnapshot);
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+       }
+
+       /**
+        * A {@link TypeSerializer} for {@link NFA} that uses Java 
Serialization.
+        */
+       public static class NFASerializer<T> extends TypeSerializer<NFA<T>> {
+
+               private static final long serialVersionUID = 
2098282423980597010L;
+
+               private final TypeSerializer<SharedBuffer<String, T>> 
sharedBufferSerializer;
+
+               private final TypeSerializer<T> eventSerializer;
+
+               public NFASerializer(TypeSerializer<T> typeSerializer) {
+                       this(typeSerializer, new 
SharedBuffer.SharedBufferSerializer<>(StringSerializer.INSTANCE, 
typeSerializer));
+               }
+
+               public NFASerializer(
+                               TypeSerializer<T> typeSerializer,
+                               TypeSerializer<SharedBuffer<String, T>> 
sharedBufferSerializer) {
+                       this.eventSerializer = typeSerializer;
+                       this.sharedBufferSerializer = sharedBufferSerializer;
+               }
+
+               @Override
+               public boolean isImmutableType() {
+                       return false;
+               }
+
+               @Override
+               public TypeSerializer<NFA<T>> duplicate() {
+                       return this;
+               }
+
+               @Override
+               public NFA<T> createInstance() {
+                       return null;
+               }
+
+               private void readObject(ObjectInputStream ois) throws 
IOException, ClassNotFoundException {
+                       ois.defaultReadObject();
+               }
+
+               @Override
+               public NFA<T> copy(NFA<T> from) {
+                       try {
+                               ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                               ObjectOutputStream oos = new 
ObjectOutputStream(baos);
+
+                               serialize(from, new 
DataOutputViewStreamWrapper(oos));
+
+                               oos.close();
+                               baos.close();
+
+                               byte[] data = baos.toByteArray();
+
+                               ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
+                               ObjectInputStream ois = new 
ObjectInputStream(bais);
+
+                               @SuppressWarnings("unchecked")
+                               NFA<T> copy = deserialize(new 
DataInputViewStreamWrapper(ois));
+                               ois.close();
+                               bais.close();
+                               return copy;
+                       } catch (IOException e) {
+                               throw new RuntimeException("Could not copy 
NFA.", e);
+                       }
+               }
+
+               @Override
+               public NFA<T> copy(NFA<T> from, NFA<T> reuse) {
+                       return copy(from);
+               }
+
+               @Override
+               public int getLength() {
+                       return -1;
+               }
+
+               @Override
+               public void serialize(NFA<T> record, DataOutputView target) 
throws IOException {
+                       serializeStates(record.states, target);
+                       target.writeLong(record.windowTime);
+                       target.writeBoolean(record.handleTimeout);
+                       
+                       
sharedBufferSerializer.serialize(record.eventSharedBuffer, target);
+
+                       target.writeInt(record.computationStates.size());
+
+                       StringSerializer stateNameSerializer = 
StringSerializer.INSTANCE;
+                       LongSerializer timestampSerializer = 
LongSerializer.INSTANCE;
+                       DeweyNumber.DeweyNumberSerializer versionSerializer = 
new DeweyNumber.DeweyNumberSerializer();
+
+                       for (ComputationState<T> computationState: 
record.computationStates) {
+                               
stateNameSerializer.serialize(computationState.getState().getName(), target);
+                               
stateNameSerializer.serialize(computationState.getPreviousState() == null
+                                               ? null : 
computationState.getPreviousState().getName(), target);
+
+                               
timestampSerializer.serialize(computationState.getTimestamp(), target);
+                               
versionSerializer.serialize(computationState.getVersion(), target);
+                               
timestampSerializer.serialize(computationState.getStartTimestamp(), target);
+
+                               if (computationState.getEvent() == null) {
+                                       target.writeBoolean(false);
+                               } else {
+                                       target.writeBoolean(true);
+                                       
eventSerializer.serialize(computationState.getEvent(), target);
+                               }
+                       }
+               }
+
+               @Override
+               public NFA<T> deserialize(DataInputView source) throws 
IOException {
+                       Set<State<T>> states = deserializeStates(source);
+                       long windowTime = source.readLong();
+                       boolean handleTimeout = source.readBoolean();
+                       
+                       NFA<T> nfa = new NFA<>(eventSerializer, windowTime, 
handleTimeout);
+                       nfa.states = states;
+                       
+                       nfa.eventSharedBuffer = 
sharedBufferSerializer.deserialize(source);
+
+                       Queue<ComputationState<T>> computationStates = new 
LinkedList<>();
+                       StringSerializer stateNameSerializer = 
StringSerializer.INSTANCE;
+                       LongSerializer timestampSerializer = 
LongSerializer.INSTANCE;
+                       DeweyNumber.DeweyNumberSerializer versionSerializer = 
new DeweyNumber.DeweyNumberSerializer();
+
+                       int computationStateNo = source.readInt();
+                       for (int i = 0; i < computationStateNo; i++) {
+                               State<T> state = 
getStateByName(stateNameSerializer.deserialize(source), nfa);
+                               State<T> prevState = 
getStateByName(stateNameSerializer.deserialize(source), nfa);
+                               long timestamp = 
timestampSerializer.deserialize(source);
+                               DeweyNumber version = 
versionSerializer.deserialize(source);
+                               long startTimestamp = 
timestampSerializer.deserialize(source);
+
+                               T event = null;
+                               if (source.readBoolean()) {
+                                       event = 
eventSerializer.deserialize(source);
+                               }
+
+                               
computationStates.add(ComputationState.createState(
+                                               nfa, state, prevState, event, 
0, timestamp, version, startTimestamp));
+                       }
+
+                       nfa.computationStates = computationStates;
+                       return nfa;
+               }
+
+               private State<T> getStateByName(String name, NFA<T> nfa) {
+                       for (State<T> state: nfa.states) {
+                               if (state.getName().equals(name)) {
+                                       return state;
+                               }
+                       }
+                       return null;
+               }
+
+               @Override
+               public NFA<T> deserialize(NFA<T> reuse, DataInputView source) 
throws IOException {
+                       return deserialize(source);
+               }
+
+               @Override
+               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
+                       Set<State<T>> states = deserializeStates(source);
+                       serializeStates(states, target);
+
+                       long windowTime = source.readLong();
+                       target.writeLong(windowTime);
+
+                       boolean handleTimeout = source.readBoolean();
+                       target.writeBoolean(handleTimeout);
+
+                       SharedBuffer<String, T> sharedBuffer = 
sharedBufferSerializer.deserialize(source);
+                       sharedBufferSerializer.serialize(sharedBuffer, target);
+
+                       StringSerializer stateNameSerializer = 
StringSerializer.INSTANCE;
+                       LongSerializer timestampSerializer = 
LongSerializer.INSTANCE;
+                       DeweyNumber.DeweyNumberSerializer versionSerializer = 
new DeweyNumber.DeweyNumberSerializer();
+
+                       int computationStateNo = source.readInt();
+                       target.writeInt(computationStateNo);
+
+                       for (int i = 0; i < computationStateNo; i++) {
+                               String stateName = 
stateNameSerializer.deserialize(source);
+                               stateNameSerializer.serialize(stateName, 
target);
+
+                               String prevStateName = 
stateNameSerializer.deserialize(source);
+                               stateNameSerializer.serialize(prevStateName, 
target);
+
+                               long timestamp = 
timestampSerializer.deserialize(source);
+                               timestampSerializer.serialize(timestamp, 
target);
+
+                               DeweyNumber version = 
versionSerializer.deserialize(source);
+                               versionSerializer.serialize(version, target);
+
+                               long startTimestamp = 
timestampSerializer.deserialize(source);
+                               timestampSerializer.serialize(startTimestamp, 
target);
+
+                               boolean hasEvent = source.readBoolean();
+                               target.writeBoolean(hasEvent);
+                               if (hasEvent) {
+                                       T event = 
eventSerializer.deserialize(source);
+                                       eventSerializer.serialize(event, 
target);
+                               }
+                       }
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       return obj == this ||
+                                       (obj != null && 
obj.getClass().equals(getClass()) &&
+                                                       
sharedBufferSerializer.equals(((NFASerializer) obj).sharedBufferSerializer) &&
+                                                       
eventSerializer.equals(((NFASerializer) obj).eventSerializer));
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return true;
+               }
+
+               @Override
+               public int hashCode() {
+                       return 37 * sharedBufferSerializer.hashCode() + 
eventSerializer.hashCode();
+               }
+
+               @Override
+               public TypeSerializerConfigSnapshot snapshotConfiguration() {
+                       return new NFASerializerConfigSnapshot(
+                                       eventSerializer.snapshotConfiguration(),
+                                       
sharedBufferSerializer.snapshotConfiguration()
+                       );
+               }
+               public CompatibilityResult<NFA<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+                       if (configSnapshot instanceof 
NFASerializerConfigSnapshot) {
+                               TypeSerializerConfigSnapshot[] 
serializerConfigSnapshots =
+                                               ((NFASerializerConfigSnapshot) 
configSnapshot).getNestedSerializerConfigSnapshots();
+
+                               CompatibilityResult<T> elementCompatResult =
+                                               
eventSerializer.ensureCompatibility(serializerConfigSnapshots[0]);
+                               CompatibilityResult<SharedBuffer<String, T>> 
sharedBufCompatResult =
+                                               
sharedBufferSerializer.ensureCompatibility(serializerConfigSnapshots[1]);
+
+                               if 
(!sharedBufCompatResult.isRequiresMigration() && 
!elementCompatResult.isRequiresMigration()) {
+                                       return CompatibilityResult.compatible();
+                               } else {
+                                       if 
(elementCompatResult.getConvertDeserializer() != null &&
+                                                       
sharedBufCompatResult.getConvertDeserializer() != null) {
+                                               return 
CompatibilityResult.requiresMigration(
+                                                               new 
NFASerializer<>(
+                                                                               
new TypeDeserializerAdapter<>(elementCompatResult.getConvertDeserializer()),
+                                                                               
new TypeDeserializerAdapter<>(sharedBufCompatResult.getConvertDeserializer())));
+                                       }
+                               }
+                       }
+
+                       return CompatibilityResult.requiresMigration(null);
+               }
+
+               private void serializeStates(Set<State<T>> states, 
DataOutputView out) throws IOException {
+                       TypeSerializer<String> nameSerializer = 
StringSerializer.INSTANCE;
+                       TypeSerializer<State.StateType> stateTypeSerializer = 
new EnumSerializer<>(State.StateType.class);
+                       TypeSerializer<StateTransitionAction> actionSerializer 
= new EnumSerializer<>(StateTransitionAction.class);
+
+                       out.writeInt(states.size());
+                       for (State<T> state: states) {
+                               nameSerializer.serialize(state.getName(), out);
+                               
stateTypeSerializer.serialize(state.getStateType(), out);
+                       }
+
+                       for (State<T> state: states) {
+                               nameSerializer.serialize(state.getName(), out);
+
+                               
out.writeInt(state.getStateTransitions().size());
+                               for (StateTransition<T> transition : 
state.getStateTransitions()) {
+                                       
nameSerializer.serialize(transition.getSourceState().getName(), out);
+                                       
nameSerializer.serialize(transition.getTargetState().getName(), out);
+                                       
actionSerializer.serialize(transition.getAction(), out);
+
+                                       
serializeCondition(transition.getCondition(), out);
+                               }
+                       }
+               }
+
+               private Set<State<T>> deserializeStates(DataInputView in) 
throws IOException {
+                       TypeSerializer<String> nameSerializer = 
StringSerializer.INSTANCE;
+                       TypeSerializer<State.StateType> stateTypeSerializer = 
new EnumSerializer<>(State.StateType.class);
+                       TypeSerializer<StateTransitionAction> actionSerializer 
= new EnumSerializer<>(StateTransitionAction.class);
+
+
+                       final int noOfStates = in.readInt();
+                       Map<String, State<T>> states = new 
HashMap<>(noOfStates);
+
+                       for (int i = 0; i < noOfStates; i++) {
+                               String stateName = 
nameSerializer.deserialize(in);
+                               State.StateType stateType = 
stateTypeSerializer.deserialize(in);
+
+                               State<T> state = new State<>(stateName, 
stateType);
+                               states.put(stateName, state);
+                       }
+
+                       for (int i = 0; i < noOfStates; i++) {
+                               String srcName = nameSerializer.deserialize(in);
+
+                               int noOfTransitions = in.readInt();
+                               for (int j = 0; j < noOfTransitions; j++) {
+                                       String src = 
nameSerializer.deserialize(in);
+                                       
Preconditions.checkState(src.equals(srcName),
+                                                       "Source Edge names do 
not match (" + srcName + " - " + src + ").");
+
+                                       String trgt = 
nameSerializer.deserialize(in);
+                                       StateTransitionAction action = 
actionSerializer.deserialize(in);
+
+                                       IterativeCondition<T> condition = null;
+                                       try {
+                                               condition = 
deserializeCondition(in);
+                                       } catch (ClassNotFoundException e) {
+                                               e.printStackTrace();
+                                       }
+
+                                       State<T> srcState = states.get(src);
+                                       State<T> trgtState = states.get(trgt);
+                                       srcState.addStateTransition(action, 
trgtState, condition);
+                               }
+
+                       }
+                       return new HashSet<>(states.values());
+               }
+
+               private void serializeCondition(IterativeCondition<T> 
condition, DataOutputView out) throws IOException {
+                       out.writeBoolean(condition != null);
+                       if (condition != null) {
+                               ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                               ObjectOutputStream oos = new 
ObjectOutputStream(baos);
+
+                               oos.writeObject(condition);
+
+                               oos.close();
+                               baos.close();
+
+                               byte[] serCondition = baos.toByteArray();
+                               out.writeInt(serCondition.length);
+                               out.write(serCondition);
+                       }
+               }
+
+               private IterativeCondition<T> 
deserializeCondition(DataInputView in) throws IOException, 
ClassNotFoundException {
+                       boolean hasCondition = in.readBoolean();
+                       if (hasCondition) {
+                               int length = in.readInt();
+
+                               byte[] serCondition = new byte[length];
+                               in.read(serCondition);
+
+                               ByteArrayInputStream bais = new 
ByteArrayInputStream(serCondition);
+                               ObjectInputStream ois = new 
ObjectInputStream(bais);
+
+                               IterativeCondition<T> condition = 
(IterativeCondition<T>) ois.readObject();
+                               ois.close();
+                               bais.close();
+
+                               return condition;
+                       }
+                       return null;
+               }
+       }
+
+       //////////////////                      Old Serialization               
        //////////////////////
 
        /**
         * A {@link TypeSerializer} for {@link NFA} that uses Java 
Serialization.
@@ -862,10 +1250,7 @@ public class NFA<T> implements Serializable {
 
                @Override
                public void serialize(NFA<T> record, DataOutputView target) 
throws IOException {
-                       try (ObjectOutputStream oos = new 
ObjectOutputStream(new DataOutputViewStream(target))) {
-                               oos.writeObject(record);
-                               oos.flush();
-                       }
+                       throw new UnsupportedOperationException("This is the 
deprecated serialization strategy.");
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index dcf5665..ab134d0 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -22,10 +22,20 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.cep.NonDuplicatingTypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -35,6 +45,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -63,6 +74,10 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
 
        private static final long serialVersionUID = 9213251042562206495L;
 
+       /**
+        * @deprecated This serializer is only used for backwards compatibility.
+        */
+       @Deprecated
        private final TypeSerializer<V> valueSerializer;
 
        private transient Map<K, SharedBufferPage<K, V>> pages;
@@ -72,6 +87,12 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
                this.pages = new HashMap<>();
        }
 
+       public TypeSerializer<V> getValueSerializer() {
+               return (valueSerializer instanceof NonDuplicatingTypeSerializer)
+                               ? ((NonDuplicatingTypeSerializer) 
valueSerializer).getTypeSerializer()
+                               : valueSerializer;
+       }
+
        /**
         * Stores given value (value + timestamp) under the given key. It 
assigns a preceding element
         * relation to the entry which is defined by the previous key, value 
(value + timestamp).
@@ -293,155 +314,6 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
                }
        }
 
-       private void writeObject(ObjectOutputStream oos) throws IOException {
-               DataOutputViewStreamWrapper target = new 
DataOutputViewStreamWrapper(oos);
-               Map<SharedBufferEntry<K, V>, Integer> entryIDs = new 
HashMap<>();
-               int totalEdges = 0;
-               int entryCounter = 0;
-
-               oos.defaultWriteObject();
-
-               // number of pages
-               oos.writeInt(pages.size());
-
-               for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: 
pages.entrySet()) {
-                       SharedBufferPage<K, V> page = pageEntry.getValue();
-
-                       // key for the current page
-                       oos.writeObject(page.getKey());
-                       // number of page entries
-                       oos.writeInt(page.entries.size());
-
-                       for (Map.Entry<ValueTimeWrapper<V>, 
SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
-                               // serialize the sharedBufferEntry
-                               SharedBufferEntry<K, V> sharedBuffer = 
sharedBufferEntry.getValue();
-
-                               // assign id to the sharedBufferEntry for the 
future serialization of the previous
-                               // relation
-                               entryIDs.put(sharedBuffer, entryCounter++);
-
-                               ValueTimeWrapper<V> valueTimeWrapper = 
sharedBuffer.getValueTime();
-
-                               
valueSerializer.serialize(valueTimeWrapper.value, target);
-                               oos.writeLong(valueTimeWrapper.getTimestamp());
-                               oos.writeInt(valueTimeWrapper.getCounter());
-
-                               int edges = sharedBuffer.edges.size();
-                               totalEdges += edges;
-
-                               oos.writeInt(sharedBuffer.referenceCounter);
-                       }
-               }
-
-               // write the edges between the shared buffer entries
-               oos.writeInt(totalEdges);
-
-               for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: 
pages.entrySet()) {
-                       SharedBufferPage<K, V> page = pageEntry.getValue();
-
-                       for (Map.Entry<ValueTimeWrapper<V>, 
SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
-                               SharedBufferEntry<K, V> sharedBuffer = 
sharedBufferEntry.getValue();
-
-                               if (!entryIDs.containsKey(sharedBuffer)) {
-                                       throw new RuntimeException("Could not 
find id for entry: " + sharedBuffer);
-                               } else {
-                                       int id = entryIDs.get(sharedBuffer);
-
-                                       for (SharedBufferEdge<K, V> edge: 
sharedBuffer.edges) {
-                                               // in order to serialize the 
previous relation we simply serialize the ids
-                                               // of the source and target 
SharedBufferEntry
-                                               if (edge.target != null) {
-                                                       if 
(!entryIDs.containsKey(edge.getTarget())) {
-                                                               throw new 
RuntimeException("Could not find id for entry: " + edge.getTarget());
-                                                       } else {
-                                                               int targetId = 
entryIDs.get(edge.getTarget());
-
-                                                               
oos.writeInt(id);
-                                                               
oos.writeInt(targetId);
-                                                               
oos.writeObject(edge.version);
-                                                       }
-                                               } else {
-                                                       oos.writeInt(id);
-                                                       oos.writeInt(-1);
-                                                       
oos.writeObject(edge.version);
-                                               }
-                                       }
-                               }
-                       }
-               }
-       }
-
-       private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
-               DataInputViewStreamWrapper source = new 
DataInputViewStreamWrapper(ois);
-               ArrayList<SharedBufferEntry<K, V>> entryList = new 
ArrayList<>();
-               ois.defaultReadObject();
-
-               this.pages = new HashMap<>();
-
-               int numberPages = ois.readInt();
-
-               for (int i = 0; i < numberPages; i++) {
-                       // key of the page
-                       @SuppressWarnings("unchecked")
-                       K key = (K)ois.readObject();
-
-                       SharedBufferPage<K, V> page = new 
SharedBufferPage<>(key);
-
-                       pages.put(key, page);
-
-                       int numberEntries = ois.readInt();
-
-                       for (int j = 0; j < numberEntries; j++) {
-                               // restore the SharedBufferEntries for the 
given page
-                               V value = valueSerializer.deserialize(source);
-                               long timestamp = ois.readLong();
-                               int counter = ois.readInt();
-
-                               ValueTimeWrapper<V> valueTimeWrapper = new 
ValueTimeWrapper<>(value, timestamp, counter);
-                               SharedBufferEntry<K, V> sharedBufferEntry = new 
SharedBufferEntry<K, V>(valueTimeWrapper, page);
-
-                               sharedBufferEntry.referenceCounter = 
ois.readInt();
-
-                               page.entries.put(valueTimeWrapper, 
sharedBufferEntry);
-
-                               entryList.add(sharedBufferEntry);
-                       }
-               }
-
-               // read the edges of the shared buffer entries
-               int numberEdges = ois.readInt();
-
-               for (int j = 0; j < numberEdges; j++) {
-                       int sourceIndex = ois.readInt();
-                       int targetIndex = ois.readInt();
-
-                       if (sourceIndex >= entryList.size() || sourceIndex < 0) 
{
-                               throw new RuntimeException("Could not find 
source entry with index " + sourceIndex +
-                                       ". This indicates a corrupted state.");
-                       } else {
-                               // We've already deserialized the shared buffer 
entry. Simply read its ID and
-                               // retrieve the buffer entry from the list of 
entries
-                               SharedBufferEntry<K, V> sourceEntry = 
entryList.get(sourceIndex);
-
-                               final DeweyNumber version = (DeweyNumber) 
ois.readObject();
-                               final SharedBufferEntry<K, V> target;
-
-                               if (targetIndex >= 0) {
-                                       if (targetIndex >= entryList.size()) {
-                                               throw new 
RuntimeException("Could not find target entry with index " + targetIndex +
-                                                       ". This indicates a 
corrupted state.");
-                                       } else {
-                                               target = 
entryList.get(targetIndex);
-                                       }
-                               } else {
-                                       target = null;
-                               }
-
-                               sourceEntry.edges.add(new SharedBufferEdge<K, 
V>(target, version));
-                       }
-               }
-       }
-
        private SharedBuffer(
                TypeSerializer<V> valueSerializer,
                Map<K, SharedBufferPage<K, V>> pages) {
@@ -523,7 +395,7 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
        public String toString() {
                StringBuilder builder = new StringBuilder();
 
-               for(Map.Entry<K, SharedBufferPage<K, V>> entry 
:pages.entrySet()){
+               for(Map.Entry<K, SharedBufferPage<K, V>> entry: 
pages.entrySet()){
                        builder.append("Key: 
").append(entry.getKey()).append("\n");
                        builder.append("Value: 
").append(entry.getValue()).append("\n");
                }
@@ -537,7 +409,7 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
                        @SuppressWarnings("unchecked")
                        SharedBuffer<K, V> other = (SharedBuffer<K, V>) obj;
 
-                       return pages.equals(other.pages) && 
valueSerializer.equals(other.valueSerializer);
+                       return pages.equals(other.pages) && 
getValueSerializer().equals(other.getValueSerializer());
                } else {
                        return false;
                }
@@ -545,7 +417,7 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
 
        @Override
        public int hashCode() {
-               return Objects.hash(pages, valueSerializer);
+               return Objects.hash(pages, getValueSerializer());
        }
 
        /**
@@ -931,4 +803,424 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
                        return "ExtractionState(" + entry + ", " + version + ", 
[" +  StringUtils.join(path, ", ") + "])";
                }
        }
+
+       //////////////                          New Serialization               
                ////////////////////
+
+       /**
+        * The {@link TypeSerializerConfigSnapshot} serializer configuration to 
be stored with the managed state.
+        */
+       public static final class SharedBufferSerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+
+               private static final int VERSION = 1;
+
+               /** This empty constructor is required for deserializing the 
configuration. */
+               public SharedBufferSerializerConfigSnapshot() {}
+
+               public SharedBufferSerializerConfigSnapshot(
+                               TypeSerializerConfigSnapshot 
keySerializerConfigSnapshot,
+                               TypeSerializerConfigSnapshot 
valueSerializerConfigSnapshot,
+                               TypeSerializerConfigSnapshot 
versionSerializerConfigSnapshot) {
+
+                       super(keySerializerConfigSnapshot, 
valueSerializerConfigSnapshot, versionSerializerConfigSnapshot);
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+       }
+
+       /**
+        * A {@link TypeSerializer} for the {@link SharedBuffer}.
+        */
+       public static class SharedBufferSerializer<K extends Serializable, V> 
extends TypeSerializer<SharedBuffer<K, V>> {
+
+               private static final long serialVersionUID = 
-3254176794680331560L;
+
+               private final TypeSerializer<K> keySerializer;
+               private final TypeSerializer<V> valueSerializer;
+               private final TypeSerializer<DeweyNumber> versionSerializer;
+
+               public SharedBufferSerializer(
+                               TypeSerializer<K> keySerializer,
+                               TypeSerializer<V> valueSerializer) {
+                       this(keySerializer, valueSerializer, new 
DeweyNumber.DeweyNumberSerializer());
+               }
+
+               public SharedBufferSerializer(
+                               TypeSerializer<K> keySerializer,
+                               TypeSerializer<V> valueSerializer,
+                               TypeSerializer<DeweyNumber> versionSerializer) {
+
+                       this.keySerializer = keySerializer;
+                       this.valueSerializer = valueSerializer;
+                       this.versionSerializer = versionSerializer;
+               }
+
+               @Override
+               public boolean isImmutableType() {
+                       return false;
+               }
+
+               @Override
+               public TypeSerializer<SharedBuffer<K, V>> duplicate() {
+                       return new SharedBufferSerializer<>(keySerializer, 
valueSerializer);
+               }
+
+               @Override
+               public SharedBuffer<K, V> createInstance() {
+                       return new SharedBuffer<>(new 
NonDuplicatingTypeSerializer<V>(valueSerializer));
+               }
+
+               @Override
+               public SharedBuffer<K, V> copy(SharedBuffer from) {
+                       try {
+                               ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                               ObjectOutputStream oos = new 
ObjectOutputStream(baos);
+
+                               serialize(from, new 
DataOutputViewStreamWrapper(oos));
+
+                               oos.close();
+                               baos.close();
+
+                               byte[] data = baos.toByteArray();
+
+                               ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
+                               ObjectInputStream ois = new 
ObjectInputStream(bais);
+
+                               @SuppressWarnings("unchecked")
+                               SharedBuffer<K, V> copy = deserialize(new 
DataInputViewStreamWrapper(ois));
+                               ois.close();
+                               bais.close();
+
+                               return copy;
+                       } catch (IOException e) {
+                               throw new RuntimeException("Could not copy 
SharredBuffer.", e);
+                       }
+               }
+
+               @Override
+               public SharedBuffer<K, V> copy(SharedBuffer from, SharedBuffer 
reuse) {
+                       return copy(from);
+               }
+
+               @Override
+               public int getLength() {
+                       return -1;
+               }
+
+               @Override
+               public void serialize(SharedBuffer record, DataOutputView 
target) throws IOException {
+                       Map<K, SharedBufferPage<K, V>> pages = record.pages;
+                       Map<SharedBufferEntry<K, V>, Integer> entryIDs = new 
HashMap<>();
+
+                       int totalEdges = 0;
+                       int entryCounter = 0;
+
+                       // number of pages
+                       target.writeInt(pages.size());
+
+                       for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: 
pages.entrySet()) {
+                               SharedBufferPage<K, V> page = 
pageEntry.getValue();
+
+                               // key for the current page
+                               keySerializer.serialize(page.getKey(), target);
+                               
+                               // number of page entries
+                               target.writeInt(page.entries.size());
+
+                               for (Map.Entry<ValueTimeWrapper<V>, 
SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
+                                       SharedBufferEntry<K, V> sharedBuffer = 
sharedBufferEntry.getValue();
+
+                                       // assign id to the sharedBufferEntry 
for the future
+                                       // serialization of the previous 
relation
+                                       entryIDs.put(sharedBuffer, 
entryCounter++);
+
+                                       ValueTimeWrapper<V> valueTimeWrapper = 
sharedBuffer.getValueTime();
+
+                                       
valueSerializer.serialize(valueTimeWrapper.value, target);
+                                       
target.writeLong(valueTimeWrapper.getTimestamp());
+                                       
target.writeInt(valueTimeWrapper.getCounter());
+
+                                       int edges = sharedBuffer.edges.size();
+                                       totalEdges += edges;
+
+                                       
target.writeInt(sharedBuffer.referenceCounter);
+                               }
+                       }
+
+                       // write the edges between the shared buffer entries
+                       target.writeInt(totalEdges);
+
+                       for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: 
pages.entrySet()) {
+                               SharedBufferPage<K, V> page = 
pageEntry.getValue();
+
+                               for (Map.Entry<ValueTimeWrapper<V>, 
SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
+                                       SharedBufferEntry<K, V> sharedBuffer = 
sharedBufferEntry.getValue();
+
+                                       Integer id = entryIDs.get(sharedBuffer);
+                                       Preconditions.checkState(id != null, 
"Could not find id for entry: " + sharedBuffer);
+
+                                       for (SharedBufferEdge<K, V> edge: 
sharedBuffer.edges) {
+                                               // in order to serialize the 
previous relation we simply serialize the ids
+                                               // of the source and target 
SharedBufferEntry
+                                               if (edge.target != null) {
+                                                       Integer targetId = 
entryIDs.get(edge.getTarget());
+                                                       
Preconditions.checkState(targetId != null,
+                                                                       "Could 
not find id for entry: " + edge.getTarget());
+
+                                                       target.writeInt(id);
+                                                       
target.writeInt(targetId);
+                                                       
versionSerializer.serialize(edge.version, target);
+                                               } else {
+                                                       target.writeInt(id);
+                                                       target.writeInt(-1);
+                                                       
versionSerializer.serialize(edge.version, target);
+                                               }
+                                       }
+                               }
+                       }
+               }
+
+               @Override
+               public SharedBuffer deserialize(DataInputView source) throws 
IOException {
+                       List<SharedBufferEntry<K, V>> entryList = new 
ArrayList<>();
+                       Map<K, SharedBufferPage<K, V>> pages = new HashMap<>();
+
+                       int totalPages = source.readInt();
+
+                       for (int i = 0; i < totalPages; i++) {
+                               // key of the page
+                               @SuppressWarnings("unchecked")
+                               K key = keySerializer.deserialize(source);
+
+                               SharedBufferPage<K, V> page = new 
SharedBufferPage<>(key);
+
+                               pages.put(key, page);
+
+                               int numberEntries = source.readInt();
+
+                               for (int j = 0; j < numberEntries; j++) {
+                                       // restore the SharedBufferEntries for 
the given page
+                                       V value = 
valueSerializer.deserialize(source);
+                                       long timestamp = source.readLong();
+                                       int counter = source.readInt();
+
+                                       ValueTimeWrapper<V> valueTimeWrapper = 
new ValueTimeWrapper<>(value, timestamp, counter);
+                                       SharedBufferEntry<K, V> 
sharedBufferEntry = new SharedBufferEntry<K, V>(valueTimeWrapper, page);
+
+                                       sharedBufferEntry.referenceCounter = 
source.readInt();
+
+                                       page.entries.put(valueTimeWrapper, 
sharedBufferEntry);
+
+                                       entryList.add(sharedBufferEntry);
+                               }
+                       }
+
+                       // read the edges of the shared buffer entries
+                       int totalEdges = source.readInt();
+
+                       for (int j = 0; j < totalEdges; j++) {
+                               int sourceIndex = source.readInt();
+                               Preconditions.checkState(sourceIndex < 
entryList.size() && sourceIndex >= 0,
+                                               "Could not find source entry 
with index " + sourceIndex +       ". This indicates a corrupted state.");
+
+                               int targetIndex = source.readInt();
+                               Preconditions.checkState(targetIndex < 
entryList.size(),
+                                               "Could not find target entry 
with index " + sourceIndex +       ". This indicates a corrupted state.");
+
+                               DeweyNumber version = 
versionSerializer.deserialize(source);
+
+                               // We've already deserialized the shared buffer 
entry. Simply read its ID and
+                               // retrieve the buffer entry from the list of 
entries
+                               SharedBufferEntry<K, V> sourceEntry = 
entryList.get(sourceIndex);
+                               SharedBufferEntry<K, V> targetEntry = 
targetIndex < 0 ? null : entryList.get(targetIndex);
+
+                               sourceEntry.edges.add(new 
SharedBufferEdge<>(targetEntry, version));
+                       }
+                       // here we put the old NonDuplicating serializer 
because this needs to create a copy
+                       // of the buffer, as created by the NFA. There, for 
compatibility reasons, we have left
+                       // the old serializer.
+                       return new SharedBuffer(new 
NonDuplicatingTypeSerializer(valueSerializer), pages);
+               }
+
+               @Override
+               public SharedBuffer deserialize(SharedBuffer reuse, 
DataInputView source) throws IOException {
+                       return deserialize(source);
+               }
+
+               @Override
+               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
+                       int numberPages = source.readInt();
+                       target.writeInt(numberPages);
+
+                       for (int i = 0; i < numberPages; i++) {
+                               // key of the page
+                               @SuppressWarnings("unchecked")
+                               K key = keySerializer.deserialize(source);
+                               keySerializer.serialize(key, target);
+
+                               int numberEntries = source.readInt();
+
+                               for (int j = 0; j < numberEntries; j++) {
+                                       // restore the SharedBufferEntries for 
the given page
+                                       V value = 
valueSerializer.deserialize(source);
+                                       valueSerializer.serialize(value, 
target);
+
+                                       long timestamp = source.readLong();
+                                       target.writeLong(timestamp);
+
+                                       int counter = source.readInt();
+                                       target.writeInt(counter);
+
+                                       int referenceCounter = source.readInt();
+                                       target.writeInt(referenceCounter);
+                               }
+                       }
+
+                       // read the edges of the shared buffer entries
+                       int numberEdges = source.readInt();
+                       target.writeInt(numberEdges);
+
+                       for (int j = 0; j < numberEdges; j++) {
+                               int sourceIndex = source.readInt();
+                               int targetIndex = source.readInt();
+
+                               target.writeInt(sourceIndex);
+                               target.writeInt(targetIndex);
+
+                               DeweyNumber version = 
versionSerializer.deserialize(source);
+                               versionSerializer.serialize(version, target);
+                       }
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       return obj == this ||
+                                       (obj != null && 
obj.getClass().equals(getClass()) &&
+                                                       
keySerializer.equals(((SharedBufferSerializer<?, ?>) obj).keySerializer) &&
+                                                       
valueSerializer.equals(((SharedBufferSerializer<?, ?>) obj).valueSerializer) &&
+                                                       
versionSerializer.equals(((SharedBufferSerializer<?, ?>) 
obj).versionSerializer));
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return true;
+               }
+
+               @Override
+               public int hashCode() {
+                       return 37 * keySerializer.hashCode() + 
valueSerializer.hashCode();
+               }
+
+               @Override
+               public TypeSerializerConfigSnapshot snapshotConfiguration() {
+                       return new SharedBufferSerializerConfigSnapshot(
+                                       keySerializer.snapshotConfiguration(),
+                                       valueSerializer.snapshotConfiguration(),
+                                       
versionSerializer.snapshotConfiguration()
+                       );
+               }
+
+               @Override
+               public CompatibilityResult<SharedBuffer<K, V>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+                       if (configSnapshot instanceof 
SharedBufferSerializerConfigSnapshot) {
+                               TypeSerializerConfigSnapshot[] 
serializerConfigSnapshots =
+                                               
((SharedBufferSerializerConfigSnapshot) 
configSnapshot).getNestedSerializerConfigSnapshots();
+
+                               CompatibilityResult<K> keyCompatResult = 
keySerializer.ensureCompatibility(serializerConfigSnapshots[0]);
+                               CompatibilityResult<V> valueCompatResult = 
valueSerializer.ensureCompatibility(serializerConfigSnapshots[1]);
+                               CompatibilityResult<DeweyNumber> 
versionCompatResult = 
versionSerializer.ensureCompatibility(serializerConfigSnapshots[2]);
+
+                               if (!keyCompatResult.isRequiresMigration() && 
!valueCompatResult.isRequiresMigration() && 
!versionCompatResult.isRequiresMigration()) {
+                                       return CompatibilityResult.compatible();
+                               } else {
+                                       if 
(keyCompatResult.getConvertDeserializer() != null
+                                                       && 
valueCompatResult.getConvertDeserializer() != null
+                                                       && 
versionCompatResult.getConvertDeserializer() != null) {
+                                               return 
CompatibilityResult.requiresMigration(
+                                                               new 
SharedBufferSerializer<>(
+                                                                               
new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()),
+                                                                               
new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer()),
+                                                                               
new TypeDeserializerAdapter<>(versionCompatResult.getConvertDeserializer())
+                                                               ));
+                                       }
+                               }
+                       }
+
+                       return CompatibilityResult.requiresMigration(null);
+               }
+       }
+
+       //////////////////                      Java Serialization methods for 
backwards compatibility                  //////////////////
+
+       private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
+               DataInputViewStreamWrapper source = new 
DataInputViewStreamWrapper(ois);
+               ArrayList<SharedBufferEntry<K, V>> entryList = new 
ArrayList<>();
+               ois.defaultReadObject();
+
+               this.pages = new HashMap<>();
+
+               int numberPages = ois.readInt();
+
+               for (int i = 0; i < numberPages; i++) {
+                       // key of the page
+                       @SuppressWarnings("unchecked")
+                       K key = (K)ois.readObject();
+
+                       SharedBufferPage<K, V> page = new 
SharedBufferPage<>(key);
+
+                       pages.put(key, page);
+
+                       int numberEntries = ois.readInt();
+
+                       for (int j = 0; j < numberEntries; j++) {
+                               // restore the SharedBufferEntries for the 
given page
+                               V value = valueSerializer.deserialize(source);
+                               long timestamp = ois.readLong();
+
+                               ValueTimeWrapper<V> valueTimeWrapper = new 
ValueTimeWrapper<>(value, timestamp, 0);
+                               SharedBufferEntry<K, V> sharedBufferEntry = new 
SharedBufferEntry<K, V>(valueTimeWrapper, page);
+
+                               sharedBufferEntry.referenceCounter = 
ois.readInt();
+
+                               page.entries.put(valueTimeWrapper, 
sharedBufferEntry);
+
+                               entryList.add(sharedBufferEntry);
+                       }
+               }
+
+               // read the edges of the shared buffer entries
+               int numberEdges = ois.readInt();
+
+               for (int j = 0; j < numberEdges; j++) {
+                       int sourceIndex = ois.readInt();
+                       int targetIndex = ois.readInt();
+
+                       if (sourceIndex >= entryList.size() || sourceIndex < 0) 
{
+                               throw new RuntimeException("Could not find 
source entry with index " + sourceIndex +
+                                               ". This indicates a corrupted 
state.");
+                       } else {
+                               // We've already deserialized the shared buffer 
entry. Simply read its ID and
+                               // retrieve the buffer entry from the list of 
entries
+                               SharedBufferEntry<K, V> sourceEntry = 
entryList.get(sourceIndex);
+
+                               final DeweyNumber version = (DeweyNumber) 
ois.readObject();
+                               final SharedBufferEntry<K, V> target;
+
+                               if (targetIndex >= 0) {
+                                       if (targetIndex >= entryList.size()) {
+                                               throw new 
RuntimeException("Could not find target entry with index " + targetIndex +
+                                                               ". This 
indicates a corrupted state.");
+                                       } else {
+                                               target = 
entryList.get(targetIndex);
+                                       }
+                               } else {
+                                       target = null;
+                               }
+
+                               sourceEntry.edges.add(new SharedBufferEdge<K, 
V>(target, version));
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 3d11538..14395b1 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -146,6 +146,8 @@ public class State<T> implements Serializable {
                Stop
        }
 
+       ////////////////                        Backwards Compatibility         
        ////////////////////
+
        private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
                ois.defaultReadObject();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 39c18b9..1b31485 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.NotCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Compiler class containing methods to compile a {@link Pattern} into a 
{@link NFA} or a
@@ -53,6 +54,8 @@ public class NFACompiler {
 
        protected static final String ENDING_STATE_NAME = "$endState$";
 
+       protected static final String STATE_NAME_DELIM = ":";
+
        /**
         * Compiles the given pattern into a {@link NFA}.
         *
@@ -71,6 +74,11 @@ public class NFACompiler {
                return factory.createNFA();
        }
 
+       public static String getOriginalStateNameFromInternal(String 
internalName) {
+               Preconditions.checkNotNull(internalName);
+               return internalName.split(STATE_NAME_DELIM)[0];
+       }
+
        /**
         * Compiles the given pattern into a {@link NFAFactory}. The NFA 
factory can be used to create
         * multiple NFAs.
@@ -178,10 +186,7 @@ public class NFACompiler {
                 * @return dummy Final state
                 */
                private State<T> createEndingState() {
-                       checkPatternNameUniqueness(ENDING_STATE_NAME);
-                       State<T> endState = new State<>(ENDING_STATE_NAME, 
State.StateType.Final);
-                       states.add(endState);
-
+                       State<T> endState = createState(ENDING_STATE_NAME, 
State.StateType.Final);
                        windowTime = currentPattern.getWindowTime() != null ? 
currentPattern.getWindowTime().toMilliseconds() : 0L;
                        return endState;
                }
@@ -199,7 +204,8 @@ public class NFACompiler {
                                if 
(currentPattern.getQuantifier().getConsumingStrategy() == 
Quantifier.ConsumingStrategy.NOT_FOLLOW) {
                                        //skip notFollow patterns, they are 
converted into edge conditions
                                } else if 
(currentPattern.getQuantifier().getConsumingStrategy() == 
Quantifier.ConsumingStrategy.NOT_NEXT) {
-                                       final State<T> notNext = 
createNormalState();
+                                       
checkPatternNameUniqueness(currentPattern.getName());
+                                       final State<T> notNext = 
createState(currentPattern.getName(), State.StateType.Normal);
                                        final IterativeCondition<T> 
notCondition = (IterativeCondition<T>) currentPattern.getCondition();
                                        final State<T> stopState = 
createStopState(notCondition, currentPattern.getName());
 
@@ -212,6 +218,7 @@ public class NFACompiler {
                                        notNext.addProceed(stopState, 
notCondition);
                                        lastSink = notNext;
                                } else {
+                                       
checkPatternNameUniqueness(currentPattern.getName());
                                        lastSink = convertPattern(lastSink);
                                }
 
@@ -236,6 +243,7 @@ public class NFACompiler {
                 */
                @SuppressWarnings("unchecked")
                private State<T> createStartState(State<T> sinkState) {
+                       checkPatternNameUniqueness(currentPattern.getName());
                        final State<T> beginningState = 
convertPattern(sinkState);
                        beginningState.makeStart();
                        return beginningState;
@@ -243,7 +251,6 @@ public class NFACompiler {
 
                private State<T> convertPattern(final State<T> sinkState) {
                        final State<T> lastSink;
-                       checkPatternNameUniqueness(currentPattern.getName());
 
                        final Quantifier quantifier = 
currentPattern.getQuantifier();
                        if 
(quantifier.hasProperty(Quantifier.QuantifierProperty.LOOPING)) {
@@ -273,18 +280,42 @@ public class NFACompiler {
                 *
                 * @return the created state
                 */
-               private State<T> createNormalState() {
-                       final State<T> state = new 
State<>(currentPattern.getName(), State.StateType.Normal);
+               private State<T> createState(String name, State.StateType 
stateType) {
+                       String stateName = getUniqueInternalStateName(name);
+                       usedNames.add(stateName);
+                       State<T> state = new State<>(stateName, stateType);
                        states.add(state);
                        return state;
                }
 
+               /**
+                * Used to give a unique name to states created
+                * during the translation process.
+                *
+                * @param baseName The base of the name.
+                */
+               private String getUniqueInternalStateName(String baseName) {
+                       int counter = 0;
+                       String candidate = baseName;
+                       while (usedNames.contains(candidate)) {
+                               candidate = baseName + STATE_NAME_DELIM + 
counter++;
+                       }
+                       return candidate;
+               }
+
+               private void checkPatternNameUniqueness(String patternName) {
+                       if (usedNames.contains(patternName)) {
+                               throw new MalformedPatternException(
+                                               "Duplicate pattern name: " + 
patternName + ". " +
+                                                               "Pattern names 
must be unique.");
+                       }
+               }
+
                private State<T> createStopState(final IterativeCondition<T> 
notCondition, final String name) {
                        // We should not duplicate the notStates. All states 
from which we can stop should point to the same one.
                        State<T> stopState = stopStates.get(name);
                        if (stopState == null) {
-                               stopState = new State<>(name, 
State.StateType.Stop);
-                               states.add(stopState);
+                               stopState = createState(name, 
State.StateType.Stop);
                                stopState.addTake(notCondition);
                                stopStates.put(name, stopState);
                        }
@@ -313,8 +344,7 @@ public class NFACompiler {
                                return sinkState;
                        }
 
-                       final State<T> copyOfSink = new 
State<>(sinkState.getName(), sinkState.getStateType());
-                       states.add(copyOfSink);
+                       final State<T> copyOfSink = 
createState(sinkState.getName(), sinkState.getStateType());
 
                        for (StateTransition<T> tStateTransition : 
sinkState.getStateTransitions()) {
 
@@ -364,15 +394,6 @@ public class NFACompiler {
                        }
                }
 
-               private void checkPatternNameUniqueness(String patternName) {
-                       if (usedNames.contains(currentPattern.getName())) {
-                               throw new MalformedPatternException(
-                                       "Duplicate pattern name: " + 
patternName + ". " +
-                                       "Pattern names must be unique.");
-                       }
-                       usedNames.add(patternName);
-               }
-
                /**
                 * Creates a "complex" state consisting of given number of 
states with
                 * same {@link IterativeCondition}
@@ -396,12 +417,12 @@ public class NFACompiler {
                                return createSingletonState(lastSink, 
ignoreCondition, false);
                        }
 
-                       final State<T> singletonState = createNormalState();
+                       final State<T> singletonState = 
createState(currentPattern.getName(), State.StateType.Normal);
                        singletonState.addTake(lastSink, currentCondition);
                        singletonState.addProceed(sinkState, 
BooleanConditions.<T>trueFunction());
 
                        if (ignoreCondition != null) {
-                               State<T> ignoreState = createNormalState();
+                               State<T> ignoreState = 
createState(currentPattern.getName(), State.StateType.Normal);
                                ignoreState.addTake(lastSink, currentCondition);
                                ignoreState.addIgnore(ignoreCondition);
                                singletonState.addIgnore(ignoreState, 
ignoreCondition);
@@ -440,7 +461,7 @@ public class NFACompiler {
                        final IterativeCondition<T> currentCondition = 
(IterativeCondition<T>) currentPattern.getCondition();
                        final IterativeCondition<T> trueFunction = 
BooleanConditions.trueFunction();
 
-                       final State<T> singletonState = createNormalState();
+                       final State<T> singletonState = 
createState(currentPattern.getName(), State.StateType.Normal);
                        // if event is accepted then all notPatterns previous 
to the optional states are no longer valid
                        final State<T> sink = 
copyWithoutTransitiveNots(sinkState);
                        singletonState.addTake(sink, currentCondition);
@@ -453,7 +474,7 @@ public class NFACompiler {
                        if (ignoreCondition != null) {
                                final State<T> ignoreState;
                                if (isOptional) {
-                                       ignoreState = createNormalState();
+                                       ignoreState = 
createState(currentPattern.getName(), State.StateType.Normal);
                                        ignoreState.addTake(sink, 
currentCondition);
                                        ignoreState.addIgnore(ignoreCondition);
                                        addStopStates(ignoreState);
@@ -479,14 +500,14 @@ public class NFACompiler {
                        final IterativeCondition<T> ignoreCondition = 
getInnerIgnoreCondition(currentPattern);
                        final IterativeCondition<T> trueFunction = 
BooleanConditions.trueFunction();
 
-                       final State<T> loopingState = createNormalState();
+                       final State<T> loopingState = 
createState(currentPattern.getName(), State.StateType.Normal);
                        loopingState.addProceed(sinkState, trueFunction);
                        loopingState.addTake(currentCondition);
 
                        addStopStateToLooping(loopingState);
 
                        if (ignoreCondition != null) {
-                               final State<T> ignoreState = 
createNormalState();
+                               final State<T> ignoreState = 
createState(currentPattern.getName(), State.StateType.Normal);
                                ignoreState.addTake(loopingState, 
currentCondition);
                                ignoreState.addIgnore(ignoreCondition);
                                loopingState.addIgnore(ignoreState, 
ignoreCondition);
@@ -507,7 +528,7 @@ public class NFACompiler {
                private State<T> createInitMandatoryStateOfOneOrMore(final 
State<T> sinkState) {
                        final IterativeCondition<T> currentCondition = 
(IterativeCondition<T>) currentPattern.getCondition();
 
-                       final State<T> firstState = createNormalState();
+                       final State<T> firstState = 
createState(currentPattern.getName(), State.StateType.Normal);
                        firstState.addTake(sinkState, currentCondition);
 
                        final IterativeCondition<T> ignoreCondition = 
getIgnoreCondition(currentPattern);
@@ -528,13 +549,13 @@ public class NFACompiler {
                private State<T> createInitOptionalStateOfZeroOrMore(final 
State<T> loopingState, final State<T> lastSink) {
                        final IterativeCondition<T> currentCondition = 
(IterativeCondition<T>) currentPattern.getCondition();
 
-                       final State<T> firstState = createNormalState();
+                       final State<T> firstState = 
createState(currentPattern.getName(), State.StateType.Normal);
                        firstState.addProceed(lastSink, 
BooleanConditions.<T>trueFunction());
                        firstState.addTake(loopingState, currentCondition);
 
                        final IterativeCondition<T> ignoreFunction = 
getIgnoreCondition(currentPattern);
                        if (ignoreFunction != null) {
-                               final State<T> firstStateWithoutProceed = 
createNormalState();
+                               final State<T> firstStateWithoutProceed = 
createState(currentPattern.getName(), State.StateType.Normal);
                                firstState.addIgnore(firstStateWithoutProceed, 
ignoreFunction);
                                
firstStateWithoutProceed.addIgnore(ignoreFunction);
                                firstStateWithoutProceed.addTake(loopingState, 
currentCondition);

http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index bac21b3..2ed7245 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -82,7 +82,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
 
        ///////////////                 State                   //////////////
 
-       private static final String NFA_OPERATOR_STATE_NAME = 
"nfaOperatorState";
+       private static final String NFA_OPERATOR_STATE_NAME = 
"nfaOperatorStateName";
        private static final String PRIORITY_QUEUE_STATE_NAME = 
"priorityQueueStateName";
 
        private transient ValueState<NFA<IN>> nfaOperatorState;
@@ -127,8 +127,8 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                if (nfaOperatorState == null) {
                        nfaOperatorState = getRuntimeContext().getState(
                                new ValueStateDescriptor<>(
-                                       NFA_OPERATOR_STATE_NAME,
-                                       new NFA.Serializer<IN>()));
+                                               NFA_OPERATOR_STATE_NAME,
+                                               new 
NFA.NFASerializer<>(inputSerializer)));
                }
 
                @SuppressWarnings("unchecked,rawtypes")
@@ -311,12 +311,20 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                                VoidNamespaceSerializer.INSTANCE,
                                this);
 
+               // this is with the old serializer so that we can read the 
state.
+               ValueState<NFA<IN>> oldNfaOperatorState = 
getRuntimeContext().getState(
+                               new ValueStateDescriptor<>("nfaOperatorState", 
new NFA.Serializer<IN>()));
+
                if (migratingFromOldKeyedOperator) {
                        int numberEntries = inputView.readInt();
-                       for (int i = 0; i <numberEntries; i++) {
+                       for (int i = 0; i < numberEntries; i++) {
                                KEY key = keySerializer.deserialize(inputView);
                                setCurrentKey(key);
                                saveRegisterWatermarkTimer();
+
+                               NFA<IN> nfa = oldNfaOperatorState.value();
+                               oldNfaOperatorState.clear();
+                               nfaOperatorState.update(nfa);
                        }
                } else {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 11d193a..2619764 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -20,16 +20,19 @@ package org.apache.flink.cep.nfa;
 
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.BooleanConditions;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -185,35 +188,136 @@ public class NFATest extends TestLogger {
 
        @Test
        public void testNFASerialization() throws IOException, 
ClassNotFoundException {
-               NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0, 
false);
-
-               State<Event> startingState = new State<>("", 
State.StateType.Start);
-               State<Event> startState = new State<>("start", 
State.StateType.Normal);
-               State<Event> endState = new State<>("end", 
State.StateType.Final);
-
-
-               startingState.addTake(
-                       new NameFilter("start"));
-               startState.addTake(
-                       new NameFilter("end"));
-               startState.addIgnore(null);
-
-               nfa.addState(startingState);
-               nfa.addState(startState);
-               nfa.addState(endState);
-
-               ByteArrayOutputStream baos = new ByteArrayOutputStream();
-               ObjectOutputStream oos = new ObjectOutputStream(baos);
-
-               oos.writeObject(nfa);
-
-               ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
-               ObjectInputStream ois = new ObjectInputStream(bais);
-
-               @SuppressWarnings("unchecked")
-               NFA<Event> copy = (NFA<Event>) ois.readObject();
-
-               assertEquals(nfa, copy);
+               Pattern<Event, ?> pattern1 = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
1858562682635302605L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
8061969839441121955L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               
}).oneOrMore().optional().allowCombinations().followedByAny("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
8061969839441121955L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               });
+
+               Pattern<Event, ?> pattern2 = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
1858562682635302605L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).notFollowedBy("not").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
-6085237016591726715L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
8061969839441121955L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               
}).oneOrMore().optional().allowCombinations().followedByAny("end").where(new 
IterativeCondition<Event>() {
+                       private static final long serialVersionUID = 
8061969839441121955L;
+
+                       @Override
+                       public boolean filter(Event value, Context<Event> ctx) 
throws Exception {
+                               double sum = 0.0;
+                               for (Event e : 
ctx.getEventsForPattern("middle")) {
+                                       sum += e.getPrice();
+                               }
+                               return sum > 5.0;
+                       }
+               });
+
+               Pattern<Event, ?> pattern3 = Pattern.<Event>begin("start")
+                               .notFollowedBy("not").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
-6085237016591726715L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
8061969839441121955L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               
}).oneOrMore().allowCombinations().followedByAny("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
8061969839441121955L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               });
+
+               List<Pattern<Event, ?>> patterns = new ArrayList<>();
+               patterns.add(pattern1);
+               patterns.add(pattern2);
+               patterns.add(pattern3);
+
+               for (Pattern<Event, ?> p: patterns) {
+                       NFACompiler.NFAFactory<Event> nfaFactory = 
NFACompiler.compileFactory(p, Event.createTypeSerializer(), false);
+                       NFA<Event> nfa = nfaFactory.createNFA();
+
+                       Event a = new Event(40, "a", 1.0);
+                       Event b = new Event(41, "b", 2.0);
+                       Event c = new Event(42, "c", 3.0);
+                       Event b1 = new Event(41, "b", 3.0);
+                       Event b2 = new Event(41, "b", 4.0);
+                       Event b3 = new Event(41, "b", 5.0);
+                       Event d = new Event(43, "d", 4.0);
+
+                       nfa.process(a, 1);
+                       nfa.process(b, 2);
+                       nfa.process(c, 3);
+                       nfa.process(b1, 4);
+                       nfa.process(b2, 5);
+                       nfa.process(b3, 6);
+                       nfa.process(d, 7);
+                       nfa.process(a, 8);
+
+                       NFA.NFASerializer<Event> serializer = new 
NFA.NFASerializer<>(Event.createTypeSerializer());
+
+                       //serialize
+                       ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                       serializer.serialize(nfa, new 
DataOutputViewStreamWrapper(baos));
+                       baos.close();
+
+                       // copy
+                       NFA.NFASerializer<Event> copySerializer = new 
NFA.NFASerializer<>(Event.createTypeSerializer());
+                       ByteArrayInputStream in = new 
ByteArrayInputStream(baos.toByteArray());
+                       ByteArrayOutputStream out = new ByteArrayOutputStream();
+                       copySerializer.copy(new DataInputViewStreamWrapper(in), 
new DataOutputViewStreamWrapper(out));
+                       in.close();
+                       out.close();
+
+                       // deserialize
+                       ByteArrayInputStream bais = new 
ByteArrayInputStream(out.toByteArray());
+                       NFA.NFASerializer<Event> deserializer = new 
NFA.NFASerializer<>(Event.createTypeSerializer());
+                       NFA<Event> copy = deserializer.deserialize(new 
DataInputViewStreamWrapper(bais));
+                       bais.close();
+
+                       assertEquals(nfa, copy);
+               }
        }
 
        private NFA<Event> createStartEndNFA(long windowLength) {
@@ -251,20 +355,4 @@ public class NFATest extends TestLogger {
 
                return nfa;
        }
-
-       private static class NameFilter extends SimpleCondition<Event> {
-
-               private static final long serialVersionUID = 
7472112494752423802L;
-
-               private final String name;
-
-               public NameFilter(final String name) {
-                       this.name = name;
-               }
-
-               @Override
-               public boolean filter(Event value) throws Exception {
-                       return value.getName().equals(name);
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index ee94b6f..bd828b6 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -20,7 +20,10 @@ package org.apache.flink.cep.nfa;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.cep.Event;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -130,15 +133,14 @@ public class SharedBufferTest extends TestLogger {
                sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], 
timestamp, 5, DeweyNumber.fromString("1.1"));
                sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], 
timestamp, 6, DeweyNumber.fromString("1.1.0"));
 
-               ByteArrayOutputStream baos = new ByteArrayOutputStream();
-               ObjectOutputStream oos = new ObjectOutputStream(baos);
+               SharedBuffer.SharedBufferSerializer serializer = new 
SharedBuffer.SharedBufferSerializer(
+                               StringSerializer.INSTANCE, 
Event.createTypeSerializer());
 
-               oos.writeObject(sharedBuffer);
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               serializer.serialize(sharedBuffer, new 
DataOutputViewStreamWrapper(baos));
 
                ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
-               ObjectInputStream ois = new ObjectInputStream(bais);
-
-               SharedBuffer<String, Event> copy = (SharedBuffer<String, 
Event>)ois.readObject();
+               SharedBuffer<String, Event> copy = serializer.deserialize(new 
DataInputViewStreamWrapper(bais));
 
                assertEquals(sharedBuffer, copy);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
index d9efb1b..fb05901 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -103,7 +103,6 @@ public class CEPFrom12MigrationTest {
        }
 
        @Test
-       @Ignore
        public void testRestoreAfterBranchingPattern() throws Exception {
 
                KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
@@ -171,6 +170,54 @@ public class CEPFrom12MigrationTest {
                assertEquals(middleEvent2, patternMap2.get("middle").get(0));
                assertEquals(endEvent, patternMap2.get("end").get(0));
 
+               // and now go for a checkpoint with the new serializers
+
+               final Event startEvent1 = new Event(42, "start", 2.0);
+               final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 
11.0);
+               final Event endEvent1 = new Event(42, "end", 2.0);
+
+               harness.processElement(new StreamRecord<Event>(startEvent1, 
21));
+               harness.processElement(new StreamRecord<Event>(middleEvent3, 
23));
+
+               // simulate snapshot/restore with some elements in internal 
sorting queue
+               OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+               harness.close();
+
+               harness = new KeyedOneInputStreamOperatorTestHarness<>(
+                               new KeyedCEPPatternOperator<>(
+                                               Event.createTypeSerializer(),
+                                               false,
+                                               IntSerializer.INSTANCE,
+                                               new NFAFactory(),
+                                               true),
+                               keySelector,
+                               BasicTypeInfo.INT_TYPE_INFO);
+
+               harness.setup();
+               harness.initializeState(snapshot);
+               harness.open();
+
+               harness.processElement(new StreamRecord<>(endEvent1, 25));
+
+               harness.processWatermark(new Watermark(50));
+
+               result = harness.getOutput();
+
+               // watermark and the result
+               assertEquals(2, result.size());
+
+               Object resultObject3 = result.poll();
+               assertTrue(resultObject3 instanceof StreamRecord);
+               StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
+               assertTrue(resultRecord3.getValue() instanceof Map);
+
+               @SuppressWarnings("unchecked")
+               Map<String, List<Event>> patternMap3 = (Map<String, 
List<Event>>) resultRecord3.getValue();
+
+               assertEquals(startEvent1, patternMap3.get("start").get(0));
+               assertEquals(middleEvent3, patternMap3.get("middle").get(0));
+               assertEquals(endEvent1, patternMap3.get("end").get(0));
+
                harness.close();
        }
 
@@ -220,7 +267,6 @@ public class CEPFrom12MigrationTest {
        }
 
        @Test
-       @Ignore
        public void testRestoreStartingNewPatternAfterMigration() throws 
Exception {
 
                KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
@@ -302,6 +348,54 @@ public class CEPFrom12MigrationTest {
                assertEquals(middleEvent2, patternMap3.get("middle").get(0));
                assertEquals(endEvent, patternMap3.get("end").get(0));
 
+               // and now go for a checkpoint with the new serializers
+
+               final Event startEvent3 = new Event(42, "start", 2.0);
+               final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 
11.0);
+               final Event endEvent1 = new Event(42, "end", 2.0);
+
+               harness.processElement(new StreamRecord<Event>(startEvent3, 
21));
+               harness.processElement(new StreamRecord<Event>(middleEvent3, 
23));
+
+               // simulate snapshot/restore with some elements in internal 
sorting queue
+               OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+               harness.close();
+
+               harness = new KeyedOneInputStreamOperatorTestHarness<>(
+                               new KeyedCEPPatternOperator<>(
+                                               Event.createTypeSerializer(),
+                                               false,
+                                               IntSerializer.INSTANCE,
+                                               new NFAFactory(),
+                                               true),
+                               keySelector,
+                               BasicTypeInfo.INT_TYPE_INFO);
+
+               harness.setup();
+               harness.initializeState(snapshot);
+               harness.open();
+
+               harness.processElement(new StreamRecord<>(endEvent1, 25));
+
+               harness.processWatermark(new Watermark(50));
+
+               result = harness.getOutput();
+
+               // watermark and the result
+               assertEquals(2, result.size());
+
+               Object resultObject4 = result.poll();
+               assertTrue(resultObject4 instanceof StreamRecord);
+               StreamRecord<?> resultRecord4 = (StreamRecord<?>) resultObject4;
+               assertTrue(resultRecord4.getValue() instanceof Map);
+
+               @SuppressWarnings("unchecked")
+               Map<String, List<Event>> patternMap4 = (Map<String, 
List<Event>>) resultRecord4.getValue();
+
+               assertEquals(startEvent3, patternMap4.get("start").get(0));
+               assertEquals(middleEvent3, patternMap4.get("middle").get(0));
+               assertEquals(endEvent1, patternMap4.get("end").get(0));
+
                harness.close();
        }
 
@@ -347,7 +441,6 @@ public class CEPFrom12MigrationTest {
 
 
        @Test
-       @Ignore
        public void testSinglePatternAfterMigration() throws Exception {
 
                KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {

Reply via email to