http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 11f14b9..78ac39c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -43,11 +43,6 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.flink.shaded.guava18.com.google.common.base.Predicate;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
-
-import javax.annotation.Nullable;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -55,7 +50,6 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OptionalDataException;
 import java.io.Serializable;
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -716,9 +710,7 @@ public class NFA<T> implements Serializable {
                return result;
        }
 
-       //////////////////////                  Fault-Tolerance / Migration     
                //////////////////////
-
-       private static final String BEGINNING_STATE_NAME = "$beginningState$";
+       //////////////////////                  Fault-Tolerance                 
//////////////////////
 
        private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
                ois.defaultReadObject();
@@ -729,103 +721,15 @@ public class NFA<T> implements Serializable {
 
                final List<ComputationState<T>> readComputationStates = new 
ArrayList<>(numberComputationStates);
 
-               boolean afterMigration = false;
                for (int i = 0; i < numberComputationStates; i++) {
                        ComputationState<T> computationState = 
readComputationState(ois);
-                       if 
(computationState.getState().getName().equals(BEGINNING_STATE_NAME)) {
-                               afterMigration = true;
-                       }
-
                        readComputationStates.add(computationState);
                }
 
-               if (afterMigration && !readComputationStates.isEmpty()) {
-                       try {
-                               //Backwards compatibility
-                               
this.computationStates.addAll(migrateNFA(readComputationStates));
-                               final Field newSharedBufferField = 
NFA.class.getDeclaredField("eventSharedBuffer");
-                               final Field sharedBufferField = 
NFA.class.getDeclaredField("sharedBuffer");
-                               sharedBufferField.setAccessible(true);
-                               newSharedBufferField.setAccessible(true);
-                               newSharedBufferField.set(this, 
SharedBuffer.migrateSharedBuffer(this.sharedBuffer));
-                               sharedBufferField.set(this, null);
-                               sharedBufferField.setAccessible(false);
-                               newSharedBufferField.setAccessible(false);
-                       } catch (Exception e) {
-                               throw new IllegalStateException("Could not 
migrate from earlier version", e);
-                       }
-               } else {
-                       this.computationStates.addAll(readComputationStates);
-               }
-
+               this.computationStates.addAll(readComputationStates);
                nonDuplicatingTypeSerializer.clearReferences();
        }
 
-       /**
-        * Needed for backward compatibility. First migrates the {@link State} 
graph see {@link NFACompiler#migrateGraph(State)}.
-        * Than recreates the {@link ComputationState}s with the new {@link 
State} graph.
-        * @param readStates computation states read from snapshot
-        * @return collection of migrated computation states
-        */
-       private Collection<ComputationState<T>> 
migrateNFA(Collection<ComputationState<T>> readStates) {
-               final ArrayList<ComputationState<T>> computationStates = new 
ArrayList<>();
-
-               final State<T> startState = Iterators.find(
-                       readStates.iterator(),
-                       new Predicate<ComputationState<T>>() {
-                               @Override
-                               public boolean apply(@Nullable 
ComputationState<T> input) {
-                                       return input != null && 
input.getState().getName().equals(BEGINNING_STATE_NAME);
-                               }
-                       }).getState();
-
-               final Map<String, State<T>> convertedStates = 
NFACompiler.migrateGraph(startState);
-
-               for (ComputationState<T> readState : readStates) {
-                       if (!readState.isStartState()) {
-                               final String previousName = 
readState.getState().getName();
-                               final String currentName = Iterators.find(
-                                       
readState.getState().getStateTransitions().iterator(),
-                                       new Predicate<StateTransition<T>>() {
-                                               @Override
-                                               public boolean apply(@Nullable 
StateTransition<T> input) {
-                                                       return input != null && 
input.getAction() == StateTransitionAction.TAKE;
-                                               }
-                                       }).getTargetState().getName();
-
-                               final State<T> previousState = 
convertedStates.get(previousName);
-
-                               
computationStates.add(ComputationState.createState(
-                                       this,
-                                       convertedStates.get(currentName),
-                                       previousState,
-                                       readState.getEvent(),
-                                       0,
-                                       readState.getTimestamp(),
-                                       readState.getVersion(),
-                                       readState.getStartTimestamp()
-                               ));
-                       }
-               }
-
-               final String startName = 
Iterators.find(convertedStates.values().iterator(), new Predicate<State<T>>() {
-                       @Override
-                       public boolean apply(@Nullable State<T> input) {
-                               return input != null && input.isStart();
-                       }
-               }).getName();
-
-               computationStates.add(ComputationState.createStartState(
-                       this,
-                       convertedStates.get(startName),
-                       new DeweyNumber(this.startEventCounter)));
-
-               this.states.clear();
-               this.states.addAll(convertedStates.values());
-
-               return computationStates;
-       }
-
        @SuppressWarnings("unchecked")
        private ComputationState<T> readComputationState(ObjectInputStream ois) 
throws IOException, ClassNotFoundException {
                final State<T> state = (State<T>) ois.readObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index c6f69b9..c36e7df 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.cep.nfa;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
@@ -335,47 +334,6 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
                this.pages = pages;
        }
 
-       /**
-        * For backward compatibility only. Previously the key in {@link 
SharedBuffer} was {@link State}.
-        * Now it is {@link String}.
-        */
-       @Internal
-       static <T> SharedBuffer<String, T> 
migrateSharedBuffer(SharedBuffer<State<T>, T> buffer) {
-
-               final Map<String, SharedBufferPage<String, T>> pageMap = new 
HashMap<>();
-               final Map<SharedBufferEntry<State<T>, T>, 
SharedBufferEntry<String, T>> entries = new HashMap<>();
-
-               for (Map.Entry<State<T>, SharedBufferPage<State<T>, T>> page : 
buffer.pages.entrySet()) {
-                       final SharedBufferPage<String, T> newPage = new 
SharedBufferPage<>(page.getKey().getName());
-                       pageMap.put(newPage.getKey(), newPage);
-
-                       for (Map.Entry<ValueTimeWrapper<T>, 
SharedBufferEntry<State<T>, T>> pageEntry : page.getValue().entries.entrySet()) 
{
-                               final SharedBufferEntry<String, T> 
newSharedBufferEntry = new SharedBufferEntry<>(
-                                       pageEntry.getKey(),
-                                       newPage);
-                               newSharedBufferEntry.referenceCounter = 
pageEntry.getValue().referenceCounter;
-                               entries.put(pageEntry.getValue(), 
newSharedBufferEntry);
-                               newPage.entries.put(pageEntry.getKey(), 
newSharedBufferEntry);
-                       }
-               }
-
-               for (Map.Entry<State<T>, SharedBufferPage<State<T>, T>> page : 
buffer.pages.entrySet()) {
-                       for (Map.Entry<ValueTimeWrapper<T>, 
SharedBufferEntry<State<T>, T>> pageEntry : page.getValue().entries.entrySet()) 
{
-                               final SharedBufferEntry<String, T> newEntry = 
entries.get(pageEntry.getValue());
-                               for (SharedBufferEdge<State<T>, T> edge : 
pageEntry.getValue().edges) {
-                                       final SharedBufferEntry<String, T> 
targetNewEntry = entries.get(edge.getTarget());
-
-                                       final SharedBufferEdge<String, T> 
newEdge = new SharedBufferEdge<>(
-                                               targetNewEntry,
-                                               edge.getVersion());
-                                       newEntry.edges.add(newEdge);
-                               }
-                       }
-               }
-
-               return new SharedBuffer<>(buffer.valueSerializer, pageMap);
-       }
-
        private SharedBufferEntry<K, V> get(
                        final K key,
                        final V value,
@@ -1177,76 +1135,4 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
                        return CompatibilityResult.requiresMigration();
                }
        }
-
-       //////////////////                      Java Serialization methods for 
backwards compatibility                  //////////////////
-
-       private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
-               DataInputViewStreamWrapper source = new 
DataInputViewStreamWrapper(ois);
-               ArrayList<SharedBufferEntry<K, V>> entryList = new 
ArrayList<>();
-               ois.defaultReadObject();
-
-               this.pages = new HashMap<>();
-
-               int numberPages = ois.readInt();
-
-               for (int i = 0; i < numberPages; i++) {
-                       // key of the page
-                       @SuppressWarnings("unchecked")
-                       K key = (K) ois.readObject();
-
-                       SharedBufferPage<K, V> page = new 
SharedBufferPage<>(key);
-
-                       pages.put(key, page);
-
-                       int numberEntries = ois.readInt();
-
-                       for (int j = 0; j < numberEntries; j++) {
-                               // restore the SharedBufferEntries for the 
given page
-                               V value = valueSerializer.deserialize(source);
-                               long timestamp = ois.readLong();
-
-                               ValueTimeWrapper<V> valueTimeWrapper = new 
ValueTimeWrapper<>(value, timestamp, 0);
-                               SharedBufferEntry<K, V> sharedBufferEntry = new 
SharedBufferEntry<K, V>(valueTimeWrapper, page);
-
-                               sharedBufferEntry.referenceCounter = 
ois.readInt();
-
-                               page.entries.put(valueTimeWrapper, 
sharedBufferEntry);
-
-                               entryList.add(sharedBufferEntry);
-                       }
-               }
-
-               // read the edges of the shared buffer entries
-               int numberEdges = ois.readInt();
-
-               for (int j = 0; j < numberEdges; j++) {
-                       int sourceIndex = ois.readInt();
-                       int targetIndex = ois.readInt();
-
-                       if (sourceIndex >= entryList.size() || sourceIndex < 0) 
{
-                               throw new RuntimeException("Could not find 
source entry with index " + sourceIndex +
-                                               ". This indicates a corrupted 
state.");
-                       } else {
-                               // We've already deserialized the shared buffer 
entry. Simply read its ID and
-                               // retrieve the buffer entry from the list of 
entries
-                               SharedBufferEntry<K, V> sourceEntry = 
entryList.get(sourceIndex);
-
-                               final DeweyNumber version = (DeweyNumber) 
ois.readObject();
-                               final SharedBufferEntry<K, V> target;
-
-                               if (targetIndex >= 0) {
-                                       if (targetIndex >= entryList.size()) {
-                                               throw new 
RuntimeException("Could not find target entry with index " + targetIndex +
-                                                               ". This 
indicates a corrupted state.");
-                                       } else {
-                                               target = 
entryList.get(targetIndex);
-                                       }
-                               } else {
-                                       target = null;
-                               }
-
-                               sourceEntry.edges.add(new SharedBufferEdge<K, 
V>(target, version));
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 593c94f..5698de6 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.cep.nfa.compiler;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
@@ -36,11 +35,6 @@ import 
org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.NotCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
-import org.apache.flink.shaded.guava18.com.google.common.base.Predicate;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
-
-import javax.annotation.Nullable;
-
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -858,114 +852,6 @@ public class NFACompiler {
        }
 
        /**
-        * Used for migrating CEP graphs prior to 1.3. It removes the dummy 
start, adds the dummy end, and translates all
-        * states to consuming ones by moving all TAKEs and IGNOREs to the next 
state. This method assumes each state
-        * has at most one TAKE and one IGNORE and name of each state is 
unique. No PROCEED transition is allowed!
-        *
-        * @param oldStartState dummy start state of old graph
-        * @param <T>           type of events
-        * @return map of new states, where key is the name of a state and 
value is the state itself
-        */
-       @Internal
-       public static <T> Map<String, State<T>> migrateGraph(State<T> 
oldStartState) {
-               State<T> oldFirst = oldStartState;
-               State<T> oldSecond = 
oldStartState.getStateTransitions().iterator().next().getTargetState();
-
-               StateTransition<T> oldFirstToSecondTake = Iterators.find(
-                       oldFirst.getStateTransitions().iterator(),
-                       new Predicate<StateTransition<T>>() {
-                               @Override
-                               public boolean apply(@Nullable 
StateTransition<T> input) {
-                                       return input != null && 
input.getAction() == StateTransitionAction.TAKE;
-                               }
-
-                       });
-
-               StateTransition<T> oldFirstIgnore = Iterators.find(
-                       oldFirst.getStateTransitions().iterator(),
-                       new Predicate<StateTransition<T>>() {
-                               @Override
-                               public boolean apply(@Nullable 
StateTransition<T> input) {
-                                       return input != null && 
input.getAction() == StateTransitionAction.IGNORE;
-                               }
-
-                       }, null);
-
-               StateTransition<T> oldSecondToThirdTake = Iterators.find(
-                       oldSecond.getStateTransitions().iterator(),
-                       new Predicate<StateTransition<T>>() {
-                               @Override
-                               public boolean apply(@Nullable 
StateTransition<T> input) {
-                                       return input != null && 
input.getAction() == StateTransitionAction.TAKE;
-                               }
-
-                       }, null);
-
-               final Map<String, State<T>> convertedStates = new HashMap<>();
-               State<T> newSecond;
-               State<T> newFirst = new State<>(oldSecond.getName(), 
State.StateType.Start);
-               convertedStates.put(newFirst.getName(), newFirst);
-               while (oldSecondToThirdTake != null) {
-
-                       newSecond = new 
State<T>(oldSecondToThirdTake.getTargetState().getName(), 
State.StateType.Normal);
-                       convertedStates.put(newSecond.getName(), newSecond);
-                       newFirst.addTake(newSecond, 
oldFirstToSecondTake.getCondition());
-
-                       if (oldFirstIgnore != null) {
-                               
newFirst.addIgnore(oldFirstIgnore.getCondition());
-                       }
-
-                       oldFirst = oldSecond;
-
-                       oldFirstToSecondTake = Iterators.find(
-                               oldFirst.getStateTransitions().iterator(),
-                               new Predicate<StateTransition<T>>() {
-                                       @Override
-                                       public boolean apply(@Nullable 
StateTransition<T> input) {
-                                               return input != null && 
input.getAction() == StateTransitionAction.TAKE;
-                                       }
-
-                               });
-
-                       oldFirstIgnore = Iterators.find(
-                               oldFirst.getStateTransitions().iterator(),
-                               new Predicate<StateTransition<T>>() {
-                                       @Override
-                                       public boolean apply(@Nullable 
StateTransition<T> input) {
-                                               return input != null && 
input.getAction() == StateTransitionAction.IGNORE;
-                                       }
-
-                               }, null);
-
-                       oldSecond = oldSecondToThirdTake.getTargetState();
-
-                       oldSecondToThirdTake = Iterators.find(
-                               oldSecond.getStateTransitions().iterator(),
-                               new Predicate<StateTransition<T>>() {
-                                       @Override
-                                       public boolean apply(@Nullable 
StateTransition<T> input) {
-                                               return input != null && 
input.getAction() == StateTransitionAction.TAKE;
-                                       }
-
-                               }, null);
-
-                       newFirst = newSecond;
-               }
-
-               final State<T> endingState = new State<>(ENDING_STATE_NAME, 
State.StateType.Final);
-
-               newFirst.addTake(endingState, 
oldFirstToSecondTake.getCondition());
-
-               if (oldFirstIgnore != null) {
-                       newFirst.addIgnore(oldFirstIgnore.getCondition());
-               }
-
-               convertedStates.put(endingState.getName(), endingState);
-
-               return convertedStates;
-       }
-
-       /**
         * Factory interface for {@link NFA}.
         *
         * @param <T> Type of the input events which are processed by the NFA

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 7556d9f..257d3e7 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -24,48 +24,29 @@ import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import 
org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.EventComparator;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import 
org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Migration;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
@@ -80,23 +61,17 @@ import java.util.stream.StreamSupport;
  * @param <IN> Type of the input elements
  * @param <KEY> Type of the key on which the input stream is keyed
  * @param <OUT> Type of the output elements
- * @param <F> user function that can be applied to matching sequences or timed 
out sequences
  */
 public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends 
Function>
        extends AbstractUdfStreamOperator<OUT, F>
-       implements OneInputStreamOperator<IN, OUT>, Triggerable<KEY, 
VoidNamespace>, CheckpointedRestoringOperator {
+       implements OneInputStreamOperator<IN, OUT>, Triggerable<KEY, 
VoidNamespace> {
 
        private static final long serialVersionUID = -4166778210774160757L;
 
-       private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
-
        private final boolean isProcessingTime;
 
        private final TypeSerializer<IN> inputSerializer;
 
-       // necessary to serialize the set of seen keys
-       private final TypeSerializer<KEY> keySerializer;
-
        ///////////////                 State                   //////////////
 
        private static final String NFA_OPERATOR_STATE_NAME = 
"nfaOperatorStateName";
@@ -115,12 +90,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT, F extends Fu
         */
        private long lastWatermark;
 
-       /**
-        * A flag used in the case of migration that indicates if
-        * we are restoring from an old keyed or non-keyed operator.
-        */
-       private final boolean migratingFromOldKeyedOperator;
-
        private final EventComparator<IN> comparator;
 
        public AbstractKeyedCEPPatternOperator(
@@ -135,10 +104,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT, F extends Fu
 
                this.inputSerializer = 
Preconditions.checkNotNull(inputSerializer);
                this.isProcessingTime = 
Preconditions.checkNotNull(isProcessingTime);
-               this.keySerializer = Preconditions.checkNotNull(keySerializer);
                this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
-
-               this.migratingFromOldKeyedOperator = 
migratingFromOldKeyedOperator;
                this.comparator = comparator;
        }
 
@@ -384,295 +350,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT, F extends Fu
                        long timestamp) throws Exception {
        }
 
-       //////////////////////                  Backwards Compatibility         
        //////////////////////
-
-       @Override
-       public void restoreState(FSDataInputStream in) throws Exception {
-               if (in instanceof Migration) {
-                       // absorb the introduced byte from the migration stream
-                       int hasUdfState = in.read();
-                       if (hasUdfState == 1) {
-                               throw new Exception("Found UDF state but 
CEPOperator is not an UDF operator.");
-                       }
-               }
-
-               DataInputViewStreamWrapper inputView = new 
DataInputViewStreamWrapper(in);
-               timerService = getInternalTimerService(
-                               "watermark-callbacks",
-                               VoidNamespaceSerializer.INSTANCE,
-                               this);
-
-               // this is with the old serializer so that we can read the 
state.
-               ValueState<NFA<IN>> oldNfaOperatorState = 
getRuntimeContext().getState(
-                               new ValueStateDescriptor<>("nfaOperatorState", 
new NFA.Serializer<IN>()));
-
-               ValueState<PriorityQueue<StreamRecord<IN>>> 
oldPriorityQueueOperatorState =
-                               getRuntimeContext().getState(
-                                       new ValueStateDescriptor<>(
-                                                       
"priorityQueueStateName",
-                                                       new 
PriorityQueueSerializer<>(
-                                                                       
((TypeSerializer) new StreamElementSerializer<>(inputSerializer)),
-                                                                       new 
PriorityQueueStreamRecordFactory<IN>()
-                                                       )
-                                       )
-                       );
-
-               if (migratingFromOldKeyedOperator) {
-                       int numberEntries = inputView.readInt();
-                       for (int i = 0; i < numberEntries; i++) {
-                               KEY key = keySerializer.deserialize(inputView);
-                               setCurrentKey(key);
-                               saveRegisterWatermarkTimer();
-
-                               NFA<IN> nfa = oldNfaOperatorState.value();
-                               oldNfaOperatorState.clear();
-                               nfaOperatorState.update(nfa);
-
-                               PriorityQueue<StreamRecord<IN>> priorityQueue = 
oldPriorityQueueOperatorState.value();
-                               if (priorityQueue != null && 
!priorityQueue.isEmpty()) {
-                                       Map<Long, List<IN>> elementMap = new 
HashMap<>();
-                                       for (StreamRecord<IN> record: 
priorityQueue) {
-                                               long timestamp = 
record.getTimestamp();
-                                               IN element = record.getValue();
-
-                                               List<IN> elements = 
elementMap.get(timestamp);
-                                               if (elements == null) {
-                                                       elements = new 
ArrayList<>();
-                                                       
elementMap.put(timestamp, elements);
-                                               }
-                                               elements.add(element);
-                                       }
-
-                                       // write the old state into the new one.
-                                       for (Map.Entry<Long, List<IN>> entry: 
elementMap.entrySet()) {
-                                               
elementQueueState.put(entry.getKey(), entry.getValue());
-                                       }
-
-                                       // clear the old state
-                                       oldPriorityQueueOperatorState.clear();
-                               }
-                       }
-               } else {
-
-                       final ObjectInputStream ois = new ObjectInputStream(in);
-
-                       // retrieve the NFA
-                       @SuppressWarnings("unchecked")
-                       NFA<IN> nfa = (NFA<IN>) ois.readObject();
-
-                       // retrieve the elements that were pending in the 
priority queue
-                       MultiplexingStreamRecordSerializer<IN> recordSerializer 
= new MultiplexingStreamRecordSerializer<>(inputSerializer);
-
-                       Map<Long, List<IN>> elementMap = new HashMap<>();
-                       int entries = ois.readInt();
-                       for (int i = 0; i < entries; i++) {
-                               StreamElement streamElement = 
recordSerializer.deserialize(inputView);
-                               StreamRecord<IN> record = 
streamElement.<IN>asRecord();
-
-                               long timestamp = record.getTimestamp();
-                               IN element = record.getValue();
-
-                               List<IN> elements = elementMap.get(timestamp);
-                               if (elements == null) {
-                                       elements = new ArrayList<>();
-                                       elementMap.put(timestamp, elements);
-                               }
-                               elements.add(element);
-                       }
-
-                       // finally register the retrieved state with the new 
keyed state.
-                       setCurrentKey((byte) 0);
-                       nfaOperatorState.update(nfa);
-
-                       // write the priority queue to the new map state.
-                       for (Map.Entry<Long, List<IN>> entry: 
elementMap.entrySet()) {
-                               elementQueueState.put(entry.getKey(), 
entry.getValue());
-                       }
-
-                       if (!isProcessingTime) {
-                               // this is relevant only for event/ingestion 
time
-                               setCurrentKey((byte) 0);
-                               saveRegisterWatermarkTimer();
-                       }
-                       ois.close();
-               }
-       }
-
-       //////////////////////                  Utility Classes                 
//////////////////////
-
-       /**
-        * Custom type serializer implementation to serialize priority queues.
-        *
-        * @param <T> Type of the priority queue's elements
-        */
-       private static class PriorityQueueSerializer<T> extends 
TypeSerializer<PriorityQueue<T>> {
-
-               private static final long serialVersionUID = 
-231980397616187715L;
-
-               private final TypeSerializer<T> elementSerializer;
-               private final PriorityQueueFactory<T> factory;
-
-               PriorityQueueSerializer(final TypeSerializer<T> 
elementSerializer, final PriorityQueueFactory<T> factory) {
-                       this.elementSerializer = elementSerializer;
-                       this.factory = factory;
-               }
-
-               @Override
-               public boolean isImmutableType() {
-                       return false;
-               }
-
-               @Override
-               public TypeSerializer<PriorityQueue<T>> duplicate() {
-                       return new 
PriorityQueueSerializer<>(elementSerializer.duplicate(), factory);
-               }
-
-               @Override
-               public PriorityQueue<T> createInstance() {
-                       return factory.createPriorityQueue();
-               }
-
-               @Override
-               public PriorityQueue<T> copy(PriorityQueue<T> from) {
-                       PriorityQueue<T> result = factory.createPriorityQueue();
-
-                       for (T element: from) {
-                               result.offer(elementSerializer.copy(element));
-                       }
-
-                       return result;
-               }
-
-               @Override
-               public PriorityQueue<T> copy(PriorityQueue<T> from, 
PriorityQueue<T> reuse) {
-                       reuse.clear();
-
-                       for (T element: from) {
-                               reuse.offer(elementSerializer.copy(element));
-                       }
-
-                       return reuse;
-               }
-
-               @Override
-               public int getLength() {
-                       return 0;
-               }
-
-               @Override
-               public void serialize(PriorityQueue<T> record, DataOutputView 
target) throws IOException {
-                       target.writeInt(record.size());
-
-                       for (T element: record) {
-                               elementSerializer.serialize(element, target);
-                       }
-               }
-
-               @Override
-               public PriorityQueue<T> deserialize(DataInputView source) 
throws IOException {
-                       PriorityQueue<T> result = factory.createPriorityQueue();
-
-                       return deserialize(result, source);
-               }
-
-               @Override
-               public PriorityQueue<T> deserialize(PriorityQueue<T> reuse, 
DataInputView source) throws IOException {
-                       reuse.clear();
-
-                       int numberEntries = source.readInt();
-
-                       for (int i = 0; i < numberEntries; i++) {
-                               
reuse.offer(elementSerializer.deserialize(source));
-                       }
-
-                       return reuse;
-               }
-
-               @Override
-               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
-
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj instanceof PriorityQueueSerializer) {
-                               @SuppressWarnings("unchecked")
-                               PriorityQueueSerializer<T> other = 
(PriorityQueueSerializer<T>) obj;
-
-                               return factory.equals(other.factory) && 
elementSerializer.equals(other.elementSerializer);
-                       } else {
-                               return false;
-                       }
-               }
-
-               @Override
-               public boolean canEqual(Object obj) {
-                       return obj instanceof PriorityQueueSerializer;
-               }
-
-               @Override
-               public int hashCode() {
-                       return Objects.hash(factory, elementSerializer);
-               }
-
-               // 
--------------------------------------------------------------------------------------------
-               // Serializer configuration snapshotting & compatibility
-               // 
--------------------------------------------------------------------------------------------
-
-               @Override
-               public TypeSerializerConfigSnapshot snapshotConfiguration() {
-                       return new 
CollectionSerializerConfigSnapshot<>(elementSerializer);
-               }
-
-               @Override
-               public CompatibilityResult<PriorityQueue<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-                       if (configSnapshot instanceof 
CollectionSerializerConfigSnapshot) {
-                               Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot> previousElemSerializerAndConfig =
-                                       ((CollectionSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerAndConfig();
-
-                               CompatibilityResult<T> compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
-                                       previousElemSerializerAndConfig.f0,
-                                       UnloadableDummyTypeSerializer.class,
-                                       previousElemSerializerAndConfig.f1,
-                                       elementSerializer);
-
-                               if (!compatResult.isRequiresMigration()) {
-                                       return CompatibilityResult.compatible();
-                               } else if 
(compatResult.getConvertDeserializer() != null) {
-                                       return 
CompatibilityResult.requiresMigration(
-                                               new PriorityQueueSerializer<>(
-                                                       new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), factory));
-                               }
-                       }
-
-                       return CompatibilityResult.requiresMigration();
-               }
-       }
-
-       private interface PriorityQueueFactory<T> extends Serializable {
-               PriorityQueue<T> createPriorityQueue();
-       }
-
-       private static class PriorityQueueStreamRecordFactory<T> implements 
PriorityQueueFactory<StreamRecord<T>> {
-
-               private static final long serialVersionUID = 
1254766984454616593L;
-
-               @Override
-               public PriorityQueue<StreamRecord<T>> createPriorityQueue() {
-                       return new 
PriorityQueue<StreamRecord<T>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new 
StreamRecordComparator<T>());
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       return obj instanceof PriorityQueueStreamRecordFactory;
-               }
-
-               @Override
-               public int hashCode() {
-                       return getClass().hashCode();
-               }
-       }
-
        //////////////////////                  Testing Methods                 
//////////////////////
 
        @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
deleted file mode 100644
index 843d668..0000000
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.ByteSerializer;
-import org.apache.flink.api.java.functions.NullByteKeySelector;
-import org.apache.flink.cep.Event;
-import org.apache.flink.cep.SubEvent;
-import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.cep.pattern.conditions.SimpleCondition;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-
-import org.junit.Test;
-
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for migration from 1.1.x to 1.3.x.
- */
-public class CEPMigration11to13Test {
-
-       private static String getResourceFilename(String filename) {
-               ClassLoader cl = CEPMigration11to13Test.class.getClassLoader();
-               URL resource = cl.getResource(filename);
-               if (resource == null) {
-                       throw new NullPointerException("Missing snapshot 
resource.");
-               }
-               return resource.getFile();
-       }
-
-       @Test
-       public void testKeyedCEPOperatorMigratation() throws Exception {
-
-               final Event startEvent = new Event(42, "start", 1.0);
-               final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
-               final Event endEvent = new Event(42, "end", 1.0);
-
-               // uncomment these lines for regenerating the snapshot on Flink 
1.1
-               /*
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = new OneInputStreamOperatorTestHarness<>(
-                               new KeyedCepOperator<>(
-                                               Event.createTypeSerializer(),
-                                               false,
-                                               keySelector,
-                                               IntSerializer.INSTANCE,
-                                               new NFAFactory()));
-               harness.configureForKeyedStream(keySelector, 
BasicTypeInfo.INT_TYPE_INFO);
-               harness.open();
-               harness.processElement(new StreamRecord<Event>(startEvent, 1));
-               harness.processElement(new StreamRecord<Event>(new Event(42, 
"foobar", 1.0), 2));
-               harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3));
-               harness.processWatermark(new Watermark(2));
-
-               harness.processElement(new StreamRecord<Event>(middleEvent, 3));
-
-               // simulate snapshot/restore with empty element queue but NFA 
state
-               StreamTaskState snapshot = harness.snapshot(1, 1);
-               FileOutputStream out = new FileOutputStream(
-                               "src/test/resources/cep-keyed-1_1-snapshot");
-               ObjectOutputStream oos = new ObjectOutputStream(out);
-               oos.writeObject(snapshot);
-               out.close();
-               harness.close();
-               */
-
-               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(
-                       CepOperatorTestUtilities.getKeyedCepOpearator(false, 
new NFAFactory()));
-
-               try {
-                       harness.setup();
-                       harness
-                               
.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-1_1-snapshot"));
-                       harness.open();
-
-                       harness.processElement(new StreamRecord<>(new Event(42, 
"start", 1.0), 4));
-                       harness.processElement(new StreamRecord<>(endEvent, 5));
-
-                       harness.processWatermark(new Watermark(20));
-
-                       ConcurrentLinkedQueue<Object> result = 
harness.getOutput();
-
-                       // watermark and the result
-                       assertEquals(2, result.size());
-
-                       Object resultObject = result.poll();
-                       assertTrue(resultObject instanceof StreamRecord);
-                       StreamRecord<?> resultRecord = (StreamRecord<?>) 
resultObject;
-                       assertTrue(resultRecord.getValue() instanceof Map);
-
-                       @SuppressWarnings("unchecked")
-                       Map<String, List<Event>> patternMap =
-                               (Map<String, List<Event>>) 
resultRecord.getValue();
-
-                       assertEquals(startEvent, 
patternMap.get("start").get(0));
-                       assertEquals(middleEvent, 
patternMap.get("middle").get(0));
-                       assertEquals(endEvent, patternMap.get("end").get(0));
-
-                       // and now go for a checkpoint with the new serializers
-
-                       final Event startEvent1 = new Event(42, "start", 2.0);
-                       final SubEvent middleEvent1 = new SubEvent(42, "foo", 
1.0, 11.0);
-                       final Event endEvent1 = new Event(42, "end", 2.0);
-
-                       harness.processElement(new 
StreamRecord<Event>(startEvent1, 21));
-                       harness.processElement(new 
StreamRecord<Event>(middleEvent1, 23));
-
-                       // simulate snapshot/restore with some elements in 
internal sorting queue
-                       OperatorStateHandles snapshot = harness.snapshot(1L, 
1L);
-                       harness.close();
-
-                       harness = 
CepOperatorTestUtilities.getCepTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(
-                               false,
-                               new NFAFactory()));
-
-                       harness.setup();
-                       harness.initializeState(snapshot);
-                       harness.open();
-
-                       harness.processElement(new StreamRecord<>(endEvent1, 
25));
-
-                       harness.processWatermark(new Watermark(50));
-
-                       result = harness.getOutput();
-
-                       // watermark and the result
-                       assertEquals(2, result.size());
-
-                       Object resultObject1 = result.poll();
-                       assertTrue(resultObject1 instanceof StreamRecord);
-                       StreamRecord<?> resultRecord1 = (StreamRecord<?>) 
resultObject1;
-                       assertTrue(resultRecord1.getValue() instanceof Map);
-
-                       @SuppressWarnings("unchecked")
-                       Map<String, List<Event>> patternMap1 =
-                               (Map<String, List<Event>>) 
resultRecord1.getValue();
-
-                       assertEquals(startEvent1, 
patternMap1.get("start").get(0));
-                       assertEquals(middleEvent1, 
patternMap1.get("middle").get(0));
-                       assertEquals(endEvent1, patternMap1.get("end").get(0));
-               } finally {
-                       harness.close();
-               }
-       }
-
-       @Test
-       public void testNonKeyedCEPFunctionMigration() throws Exception {
-
-               final Event startEvent = new Event(42, "start", 1.0);
-               final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
-               final Event endEvent = new Event(42, "end", 1.0);
-
-               // uncomment these lines for regenerating the snapshot on Flink 
1.1
-               /*
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = new OneInputStreamOperatorTestHarness<>(
-                               new CEPPatternOperator<>(
-                                               Event.createTypeSerializer(),
-                                               false,
-                                               new NFAFactory()));
-               harness.open();
-               harness.processElement(new StreamRecord<Event>(startEvent, 1));
-               harness.processElement(new StreamRecord<Event>(new Event(42, 
"foobar", 1.0), 2));
-               harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3));
-               harness.processWatermark(new Watermark(2));
-
-               harness.processElement(new StreamRecord<Event>(middleEvent, 3));
-
-               // simulate snapshot/restore with empty element queue but NFA 
state
-               StreamTaskState snapshot = harness.snapshot(1, 1);
-               FileOutputStream out = new FileOutputStream(
-                               
"src/test/resources/cep-non-keyed-1.1-snapshot");
-               ObjectOutputStream oos = new ObjectOutputStream(out);
-               oos.writeObject(snapshot);
-               out.close();
-               harness.close();
-               */
-
-               NullByteKeySelector keySelector = new NullByteKeySelector();
-
-               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
-                       new KeyedOneInputStreamOperatorTestHarness<Byte, Event, 
Map<String, List<Event>>>(
-                               
CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory(), 
ByteSerializer.INSTANCE, false, null),
-                               keySelector,
-                               BasicTypeInfo.BYTE_TYPE_INFO);
-
-               try {
-                       harness.setup();
-                       harness.initializeStateFromLegacyCheckpoint(
-                               
getResourceFilename("cep-non-keyed-1.1-snapshot"));
-                       harness.open();
-
-                       harness.processElement(new StreamRecord<>(new Event(42, 
"start", 1.0), 4));
-                       harness.processElement(new StreamRecord<>(endEvent, 5));
-
-                       harness.processWatermark(new Watermark(20));
-
-                       ConcurrentLinkedQueue<Object> result = 
harness.getOutput();
-
-                       // watermark and the result
-                       assertEquals(2, result.size());
-
-                       Object resultObject = result.poll();
-                       assertTrue(resultObject instanceof StreamRecord);
-                       StreamRecord<?> resultRecord = (StreamRecord<?>) 
resultObject;
-                       assertTrue(resultRecord.getValue() instanceof Map);
-
-                       @SuppressWarnings("unchecked")
-                       Map<String, List<Event>> patternMap =
-                               (Map<String, List<Event>>) 
resultRecord.getValue();
-
-                       assertEquals(startEvent, 
patternMap.get("start").get(0));
-                       assertEquals(middleEvent, 
patternMap.get("middle").get(0));
-                       assertEquals(endEvent, patternMap.get("end").get(0));
-
-                       // and now go for a checkpoint with the new serializers
-
-                       final Event startEvent1 = new Event(42, "start", 2.0);
-                       final SubEvent middleEvent1 = new SubEvent(42, "foo", 
1.0, 11.0);
-                       final Event endEvent1 = new Event(42, "end", 2.0);
-
-                       harness.processElement(new 
StreamRecord<Event>(startEvent1, 21));
-                       harness.processElement(new 
StreamRecord<Event>(middleEvent1, 23));
-
-                       // simulate snapshot/restore with some elements in 
internal sorting queue
-                       OperatorStateHandles snapshot = harness.snapshot(1L, 
1L);
-                       harness.close();
-
-                       harness = new 
KeyedOneInputStreamOperatorTestHarness<Byte, Event, Map<String, List<Event>>>(
-                               
CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory(), 
ByteSerializer.INSTANCE),
-                               keySelector,
-                               BasicTypeInfo.BYTE_TYPE_INFO);
-
-                       harness.setup();
-                       harness.initializeState(snapshot);
-                       harness.open();
-
-                       harness.processElement(new StreamRecord<>(endEvent1, 
25));
-
-                       harness.processWatermark(new Watermark(50));
-
-                       result = harness.getOutput();
-
-                       // watermark and the result
-                       assertEquals(2, result.size());
-
-                       Object resultObject1 = result.poll();
-                       assertTrue(resultObject1 instanceof StreamRecord);
-                       StreamRecord<?> resultRecord1 = (StreamRecord<?>) 
resultObject1;
-                       assertTrue(resultRecord1.getValue() instanceof Map);
-
-                       @SuppressWarnings("unchecked")
-                       Map<String, List<Event>> patternMap1 =
-                               (Map<String, List<Event>>) 
resultRecord1.getValue();
-
-                       assertEquals(startEvent1, 
patternMap1.get("start").get(0));
-                       assertEquals(middleEvent1, 
patternMap1.get("middle").get(0));
-                       assertEquals(endEvent1, patternMap1.get("end").get(0));
-               } finally {
-                       harness.close();
-               }
-       }
-
-       private static class NFAFactory implements 
NFACompiler.NFAFactory<Event> {
-
-               private static final long serialVersionUID = 
1173020762472766713L;
-
-               private final boolean handleTimeout;
-
-               private NFAFactory() {
-                       this(false);
-               }
-
-               private NFAFactory(boolean handleTimeout) {
-                       this.handleTimeout = handleTimeout;
-               }
-
-               @Override
-               public NFA<Event> createNFA() {
-
-                       Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new StartFilter())
-                                       
.followedBy("middle").subtype(SubEvent.class).where(new MiddleFilter())
-                                       .followedBy("end").where(new 
EndFilter())
-                                       // add a window timeout to test whether 
timestamps of elements in the
-                                       // priority queue in CEP operator are 
correctly checkpointed/restored
-                                       .within(Time.milliseconds(10L));
-
-                       return NFACompiler.compile(pattern, 
Event.createTypeSerializer(), handleTimeout);
-               }
-       }
-
-       private static class StartFilter extends SimpleCondition<Event> {
-               private static final long serialVersionUID = 
5726188262756267490L;
-
-               @Override
-               public boolean filter(Event value) throws Exception {
-                       return value.getName().equals("start");
-               }
-       }
-
-       private static class MiddleFilter extends SimpleCondition<SubEvent> {
-               private static final long serialVersionUID = 
6215754202506583964L;
-
-               @Override
-               public boolean filter(SubEvent value) throws Exception {
-                       return value.getVolume() > 5.0;
-               }
-       }
-
-       private static class EndFilter extends SimpleCondition<Event> {
-               private static final long serialVersionUID = 
7056763917392056548L;
-
-               @Override
-               public boolean filter(Event value) throws Exception {
-                       return value.getName().equals("end");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
index cf3c921..ed28f25 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
@@ -71,7 +71,7 @@ public class CEPMigrationTest {
 
        @Parameterized.Parameters(name = "Migration Savepoint: {0}")
        public static Collection<MigrationVersion> parameters () {
-               return Arrays.asList(MigrationVersion.v1_2, 
MigrationVersion.v1_3);
+               return Arrays.asList(MigrationVersion.v1_3);
        }
 
        public CEPMigrationTest(MigrationVersion migrateVersion) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
deleted file mode 100644
index c4e23ca..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration;
-
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * The purpose of this class is the be filled in as a placeholder for the 
namespace serializer when migrating from
- * Flink 1.1 savepoint (which did not include the namespace serializer) to 
Flink 1.2 (which always must include a
- * (non-null) namespace serializer. This is then replaced as soon as the user 
is re-registering her state again for
- * the first run under Flink 1.2 and provides again the real namespace 
serializer.
- *
- * @deprecated Internal class for savepoint backwards compatibility. Don't use 
for other purposes.
- */
-@Deprecated
-@SuppressWarnings("deprecation")
-public class MigrationNamespaceSerializerProxy extends 
TypeSerializer<Serializable> {
-
-       public static final MigrationNamespaceSerializerProxy INSTANCE = new 
MigrationNamespaceSerializerProxy();
-
-       private static final long serialVersionUID = -707800010807094491L;
-
-       private MigrationNamespaceSerializerProxy() {
-       }
-
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public TypeSerializer<Serializable> duplicate() {
-               return this;
-       }
-
-       @Override
-       public Serializable createInstance() {
-               throw new UnsupportedOperationException(
-                               "This is just a proxy used during migration 
until the real type serializer is provided by the user.");
-       }
-
-       @Override
-       public Serializable copy(Serializable from) {
-               throw new UnsupportedOperationException(
-                               "This is just a proxy used during migration 
until the real type serializer is provided by the user.");
-       }
-
-       @Override
-       public Serializable copy(Serializable from, Serializable reuse) {
-               throw new UnsupportedOperationException(
-                               "This is just a proxy used during migration 
until the real type serializer is provided by the user.");
-       }
-
-       @Override
-       public int getLength() {
-               return -1;
-       }
-
-       @Override
-       public void serialize(Serializable record, DataOutputView target) 
throws IOException {
-               throw new UnsupportedOperationException(
-                               "This is just a proxy used during migration 
until the real type serializer is provided by the user.");
-       }
-
-       @Override
-       public Serializable deserialize(DataInputView source) throws 
IOException {
-               throw new UnsupportedOperationException(
-                               "This is just a proxy used during migration 
until the real type serializer is provided by the user.");
-       }
-
-       @Override
-       public Serializable deserialize(Serializable reuse, DataInputView 
source) throws IOException {
-               throw new UnsupportedOperationException(
-                               "This is just a proxy used during migration 
until the real type serializer is provided by the user.");
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               throw new UnsupportedOperationException(
-                               "This is just a proxy used during migration 
until the real type serializer is provided by the user.");
-       }
-
-       @Override
-       public TypeSerializerConfigSnapshot snapshotConfiguration() {
-               return new 
ParameterlessTypeSerializerConfig(getClass().getCanonicalName());
-       }
-
-       @Override
-       public CompatibilityResult<Serializable> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-               // always assume compatibility since we're just a proxy for 
migration
-               return CompatibilityResult.compatible();
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               return obj instanceof MigrationNamespaceSerializerProxy;
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return true;
-       }
-
-       @Override
-       public int hashCode() {
-               return 42;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java 
b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
deleted file mode 100644
index a6055a8..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration;
-
-import org.apache.flink.migration.state.MigrationKeyGroupStateHandle;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-
-import java.util.Collection;
-
-/**
- * Utility functions for migration.
- */
-public class MigrationUtil {
-
-       @SuppressWarnings("deprecation")
-       public static boolean 
isOldSavepointKeyedState(Collection<KeyedStateHandle> keyedStateHandles) {
-               return (keyedStateHandles != null)
-                               && (keyedStateHandles.size() == 1)
-                               && (keyedStateHandles.iterator().next() 
instanceof MigrationKeyGroupStateHandle);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
deleted file mode 100644
index 5196d2d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.api.common.state;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.StateBinder;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-/**
- * The old version of the {@link 
org.apache.flink.api.common.state.ListStateDescriptor}, retained for
- * serialization backwards compatibility.
- *
- * @deprecated Internal class for savepoint backwards compatibility. Don't use 
for other purposes.
- */
-@Internal
-@Deprecated
-@SuppressWarnings("deprecation")
-public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * Creates a new {@code ListStateDescriptor} with the given name and 
list element type.
-        *
-        * <p>If this constructor fails (because it is not possible to describe 
the type via a class),
-        * consider using the {@link #ListStateDescriptor(String, 
TypeInformation)} constructor.
-        *
-        * @param name The (unique) name for the state.
-        * @param typeClass The type of the values in the state.
-        */
-       public ListStateDescriptor(String name, Class<T> typeClass) {
-               super(name, typeClass, null);
-       }
-
-       /**
-        * Creates a new {@code ListStateDescriptor} with the given name and 
list element type.
-        *
-        * @param name The (unique) name for the state.
-        * @param typeInfo The type of the values in the state.
-        */
-       public ListStateDescriptor(String name, TypeInformation<T> typeInfo) {
-               super(name, typeInfo, null);
-       }
-
-       /**
-        * Creates a new {@code ListStateDescriptor} with the given name and 
list element type.
-        *
-        * @param name The (unique) name for the state.
-        * @param typeSerializer The type serializer for the list values.
-        */
-       public ListStateDescriptor(String name, TypeSerializer<T> 
typeSerializer) {
-               super(name, typeSerializer, null);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public ListState<T> bind(StateBinder stateBinder) throws Exception {
-               throw new IllegalStateException("Cannot bind states with a 
legacy state descriptor.");
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
-
-               return serializer.equals(that.serializer) && 
name.equals(that.name);
-
-       }
-
-       @Override
-       public int hashCode() {
-               int result = serializer.hashCode();
-               result = 31 * result + name.hashCode();
-               return result;
-       }
-
-       @Override
-       public String toString() {
-               return "ListStateDescriptor{" +
-                               "serializer=" + serializer +
-                               '}';
-       }
-
-       @Override
-       public org.apache.flink.api.common.state.StateDescriptor.Type getType() 
{
-               return 
org.apache.flink.api.common.state.StateDescriptor.Type.LIST;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java
deleted file mode 100644
index 0b25e08..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.runtime.checkpoint;
-
-import org.apache.flink.migration.runtime.state.StateHandle;
-import org.apache.flink.migration.util.SerializedValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-
-/**
- * Simple container class which contains the serialized state handle for a key 
group.
- *
- * The key group state handle is kept in serialized form because it can 
contain user code classes
- * which might not be available on the JobManager.
- *
- * @deprecated Internal class for savepoint backwards compatibility. Don't use 
for other purposes.
- */
-@Deprecated
-@SuppressWarnings("deprecation")
-public class KeyGroupState implements Serializable {
-       private static final long serialVersionUID = -5926696455438467634L;
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KeyGroupState.class);
-
-       private final SerializedValue<StateHandle<?>> keyGroupState;
-
-       private final long stateSize;
-
-       private final long duration;
-
-       public KeyGroupState(SerializedValue<StateHandle<?>> keyGroupState, 
long stateSize, long duration) {
-               this.keyGroupState = keyGroupState;
-
-               this.stateSize = stateSize;
-
-               this.duration = duration;
-       }
-
-       public SerializedValue<StateHandle<?>> getKeyGroupState() {
-               return keyGroupState;
-       }
-
-       public long getDuration() {
-               return duration;
-       }
-
-       public long getStateSize() {
-               return stateSize;
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof KeyGroupState) {
-                       KeyGroupState other = (KeyGroupState) obj;
-
-                       return keyGroupState.equals(other.keyGroupState) && 
stateSize == other.stateSize &&
-                               duration == other.duration;
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public int hashCode() {
-               return (int) (this.stateSize ^ this.stateSize >>> 32) +
-                       31 * ((int) (this.duration ^ this.duration >>> 32) +
-                               31 * keyGroupState.hashCode());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java
deleted file mode 100644
index d42d146..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.runtime.checkpoint;
-
-import org.apache.flink.migration.runtime.state.StateHandle;
-import org.apache.flink.migration.util.SerializedValue;
-
-import java.io.Serializable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * @deprecated Internal class for savepoint backwards compatibility. Don't use 
for other purposes.
- */
-@Deprecated
-@SuppressWarnings("deprecation")
-public class SubtaskState implements Serializable {
-
-       private static final long serialVersionUID = -2394696997971923995L;
-
-       /** The state of the parallel operator */
-       private final SerializedValue<StateHandle<?>> state;
-
-       /**
-        * The state size. This is also part of the deserialized state handle.
-        * We store it here in order to not deserialize the state handle when
-        * gathering stats.
-        */
-       private final long stateSize;
-
-       /** The duration of the acknowledged (ack timestamp - trigger 
timestamp). */
-       private final long duration;
-       
-       public SubtaskState(
-                       SerializedValue<StateHandle<?>> state,
-                       long stateSize,
-                       long duration) {
-
-               this.state = checkNotNull(state, "State");
-               // Sanity check and don't fail checkpoint because of this.
-               this.stateSize = stateSize >= 0 ? stateSize : 0;
-
-               this.duration = duration;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       public SerializedValue<StateHandle<?>> getState() {
-               return state;
-       }
-
-       public long getStateSize() {
-               return stateSize;
-       }
-
-       public long getDuration() {
-               return duration;
-       }
-
-       public void discard(ClassLoader userClassLoader) throws Exception {
-
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               else if (o instanceof SubtaskState) {
-                       SubtaskState that = (SubtaskState) o;
-                       return this.state.equals(that.state) && stateSize == 
that.stateSize &&
-                               duration == that.duration;
-               }
-               else {
-                       return false;
-               }
-       }
-
-       @Override
-       public int hashCode() {
-               return (int) (this.stateSize ^ this.stateSize >>> 32) +
-                       31 * ((int) (this.duration ^ this.duration >>> 32) +
-                               31 * state.hashCode());
-       }
-
-       @Override
-       public String toString() {
-               return String.format("SubtaskState(Size: %d, Duration: %d, 
State: %s)", stateSize, duration, state);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java
deleted file mode 100644
index c0a7b2d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.runtime.checkpoint;
-
-import org.apache.flink.migration.runtime.state.StateHandle;
-import org.apache.flink.migration.util.SerializedValue;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * @deprecated Internal class for savepoint backwards compatibility. Don't use 
for other purposes.
- */
-@Deprecated
-@SuppressWarnings("deprecation")
-public class TaskState implements Serializable {
-
-       private static final long serialVersionUID = -4845578005863201810L;
-
-       private final JobVertexID jobVertexID;
-
-       /** Map of task states which can be accessed by their sub task index */
-       private final Map<Integer, SubtaskState> subtaskStates;
-
-       /** Map of key-value states which can be accessed by their key group 
index */
-       private final Map<Integer, KeyGroupState> kvStates;
-
-       /** Parallelism of the operator when it was checkpointed */
-       private final int parallelism;
-
-       public TaskState(JobVertexID jobVertexID, int parallelism) {
-               this.jobVertexID = jobVertexID;
-
-               this.subtaskStates = new HashMap<>(parallelism);
-
-               this.kvStates = new HashMap<>();
-
-               this.parallelism = parallelism;
-       }
-
-       public JobVertexID getJobVertexID() {
-               return jobVertexID;
-       }
-
-       public void putState(int subtaskIndex, SubtaskState subtaskState) {
-               if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
-                       throw new IndexOutOfBoundsException("The given sub task 
index " + subtaskIndex +
-                               " exceeds the maximum number of sub tasks " + 
subtaskStates.size());
-               } else {
-                       subtaskStates.put(subtaskIndex, subtaskState);
-               }
-       }
-
-       public SubtaskState getState(int subtaskIndex) {
-               if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
-                       throw new IndexOutOfBoundsException("The given sub task 
index " + subtaskIndex +
-                               " exceeds the maximum number of sub tasks " + 
subtaskStates.size());
-               } else {
-                       return subtaskStates.get(subtaskIndex);
-               }
-       }
-
-       public Collection<SubtaskState> getStates() {
-               return subtaskStates.values();
-       }
-
-       public Map<Integer, SubtaskState> getSubtaskStatesById() {
-               return subtaskStates;
-       }
-
-       public long getStateSize() {
-               long result = 0L;
-
-               for (SubtaskState subtaskState : subtaskStates.values()) {
-                       result += subtaskState.getStateSize();
-               }
-
-               for (KeyGroupState keyGroupState : kvStates.values()) {
-                       result += keyGroupState.getStateSize();
-               }
-
-               return result;
-       }
-
-       public int getNumberCollectedStates() {
-               return subtaskStates.size();
-       }
-
-       public int getParallelism() {
-               return parallelism;
-       }
-
-       public void putKvState(int keyGroupId, KeyGroupState keyGroupState) {
-               kvStates.put(keyGroupId, keyGroupState);
-       }
-
-       public KeyGroupState getKvState(int keyGroupId) {
-               return kvStates.get(keyGroupId);
-       }
-
-       /**
-        * Retrieve the set of key-value state key groups specified by the 
given key group partition set.
-        * The key groups are returned as a map where the key group index maps 
to the serialized state
-        * handle of the key group.
-        *
-        * @param keyGroupPartition Set of key group indices
-        * @return Map of serialized key group state handles indexed by their 
key group index.
-        */
-       public Map<Integer, SerializedValue<StateHandle<?>>> 
getUnwrappedKvStates(Set<Integer> keyGroupPartition) {
-               HashMap<Integer, SerializedValue<StateHandle<?>>> result = new 
HashMap<>(keyGroupPartition.size());
-
-               for (Integer keyGroupId : keyGroupPartition) {
-                       KeyGroupState keyGroupState = kvStates.get(keyGroupId);
-
-                       if (keyGroupState != null) {
-                               result.put(keyGroupId, 
kvStates.get(keyGroupId).getKeyGroupState());
-                       }
-               }
-
-               return result;
-       }
-
-       public int getNumberCollectedKvStates() {
-               return kvStates.size();
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof TaskState) {
-                       TaskState other = (TaskState) obj;
-
-                       return jobVertexID.equals(other.jobVertexID) && 
parallelism == other.parallelism &&
-                               subtaskStates.equals(other.subtaskStates) && 
kvStates.equals(other.kvStates);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public int hashCode() {
-               return parallelism + 31 * Objects.hash(jobVertexID, 
subtaskStates, kvStates);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
deleted file mode 100644
index 7888d2f..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.runtime.checkpoint.savepoint;
-
-import org.apache.flink.migration.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collection;
-
-/**
- * Savepoint version 0.
- *
- * <p>This format was introduced with Flink 1.1.0.
- */
-@SuppressWarnings("deprecation")
-public class SavepointV0 implements Savepoint {
-
-       /** The savepoint version. */
-       public static final int VERSION = 0;
-
-       /** The checkpoint ID */
-       private final long checkpointId;
-
-       /** The task states */
-       private final Collection<TaskState> taskStates;
-
-       public SavepointV0(long checkpointId, Collection<TaskState> taskStates) 
{
-               this.checkpointId = checkpointId;
-               this.taskStates = Preconditions.checkNotNull(taskStates, "Task 
States");
-       }
-
-       @Override
-       public int getVersion() {
-               return VERSION;
-       }
-
-       @Override
-       public long getCheckpointId() {
-               return checkpointId;
-       }
-
-       @Override
-       public Collection<org.apache.flink.runtime.checkpoint.TaskState> 
getTaskStates() {
-               // since checkpoints are never deserialized into this format,
-               // this method should never be called
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public Collection<MasterState> getMasterStates() {
-               // since checkpoints are never deserialized into this format,
-               // this method should never be called
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public Collection<OperatorState> getOperatorStates() {
-               return null;
-       }
-
-       @Override
-       public void dispose() throws Exception {
-               //NOP
-       }
-
-
-       public Collection<TaskState> getOldTaskStates() {
-               return taskStates;
-       }
-
-       @Override
-       public String toString() {
-               return "Savepoint(version=" + VERSION + ")";
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               SavepointV0 that = (SavepointV0) o;
-               return checkpointId == that.checkpointId && 
getTaskStates().equals(that.getTaskStates());
-       }
-
-       @Override
-       public int hashCode() {
-               int result = (int) (checkpointId ^ (checkpointId >>> 32));
-               result = 31 * result + taskStates.hashCode();
-               return result;
-       }
-}

Reply via email to