[FLINK-5846] [cep] Make the CEP operators backwards compatible with Flink 1.1


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/521a53d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/521a53d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/521a53d9

Branch: refs/heads/master
Commit: 521a53d9ad68a3f16a32e08843a6fca2bd4e439d
Parents: d998ff9
Author: kl0u <[email protected]>
Authored: Mon Feb 20 14:51:51 2017 +0100
Committer: kl0u <[email protected]>
Committed: Fri Mar 17 16:32:41 2017 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  47 ++--
 .../AbstractKeyedCEPPatternOperator.java        |  85 +++++-
 .../flink/cep/operator/CEPOperatorUtils.java    |  12 +-
 .../cep/operator/KeyedCEPPatternOperator.java   |   5 +-
 .../TimeoutKeyedCEPPatternOperator.java         |   5 +-
 .../cep/operator/CEPMigration11to13Test.java    | 268 +++++++++++++++++++
 .../flink/cep/operator/CEPOperatorTest.java     |   6 +-
 .../flink/cep/operator/CEPRescalingTest.java    |   3 +-
 .../src/test/resources/cep-keyed-snapshot-1.1   | Bin 0 -> 5612 bytes
 .../test/resources/cep-non-keyed-snapshot-1.1   | Bin 0 -> 3274 bytes
 .../MultiplexingStreamRecordSerializer.java     | 229 ++++++++++++++++
 .../streamrecord/StreamRecordSerializer.java    | 150 +++++++++++
 pom.xml                                         |   2 +
 13 files changed, 769 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 8d87fd8..12afe4e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -119,6 +119,8 @@ public class NFA<T> implements Serializable {
         */
        private transient Queue<ComputationState<T>> computationStates;
 
+       private StateTransitionComparator<T>  stateTransitionComparator;
+
        public NFA(
                        final TypeSerializer<T> eventSerializer,
                        final long windowTime,
@@ -131,6 +133,7 @@ public class NFA<T> implements Serializable {
                this.computationStates = new LinkedList<>();
                this.states = new HashSet<>();
                this.startEventCounter = 1;
+               this.stateTransitionComparator =  new 
StateTransitionComparator<>();
        }
 
        public Set<State<T>> getStates() {
@@ -262,22 +265,6 @@ public class NFA<T> implements Serializable {
        }
 
        /**
-        * Comparator used for imposing the assumption that IGNORE is always 
the last StateTransition in a state.
-        */
-       private interface StateTransitionComparator<T> extends 
Comparator<StateTransition<T>>, Serializable {}
-       private final Comparator<StateTransition<T>> stateTransitionComparator 
= new StateTransitionComparator<T>() {
-               private static final long serialVersionUID = 
-2775474935413622278L;
-
-               @Override
-               public int compare(final StateTransition<T> o1, final 
StateTransition<T> o2) {
-                       if (o1.getAction() == o2.getAction()) {
-                               return 0;
-                       }
-                       return o1.getAction() == StateTransitionAction.IGNORE ? 
1 : -1;
-               }
-       };
-
-       /**
         * Computes the next computation states based on the given computation 
state, the current event,
         * its timestamp and the internal state machine.
         *
@@ -301,6 +288,13 @@ public class NFA<T> implements Serializable {
                        State<T> currentState = states.pop();
                        final List<StateTransition<T>> stateTransitions = new 
ArrayList<>(currentState.getStateTransitions());
 
+                       // this is for when we restore from legacy. In that 
case, the comparator is null
+                       // as it did not exist in the previous Flink versions, 
so we have to initialize it here.
+
+                       if (stateTransitionComparator == null) {
+                               stateTransitionComparator = new 
StateTransitionComparator();
+                       }
+
                        // impose the IGNORE will be processed last
                        Collections.sort(stateTransitions, 
stateTransitionComparator);
 
@@ -601,10 +595,7 @@ public class NFA<T> implements Serializable {
                        ObjectInputStream ois = new ObjectInputStream(new 
DataInputViewStream(source));
 
                        try {
-                               @SuppressWarnings("unchecked")
-                               NFA<T> nfa = null;
-                               nfa = (NFA<T>) ois.readObject();
-                               return nfa;
+                               return (NFA<T>) ois.readObject();
                        } catch (ClassNotFoundException e) {
                                throw new RuntimeException("Could not 
deserialize NFA.", e);
                        }
@@ -637,4 +628,20 @@ public class NFA<T> implements Serializable {
                        return getClass().hashCode();
                }
        }
+
+       /**
+        * Comparator used for imposing the assumption that IGNORE is always 
the last StateTransition in a state.
+        */
+       private static final class StateTransitionComparator<T> implements 
Serializable, Comparator<StateTransition<T>> {
+
+               private static final long serialVersionUID = 
-2775474935413622278L;
+
+               @Override
+               public int compare(final StateTransition<T> o1, final 
StateTransition<T> o2) {
+                       if (o1.getAction() == o2.getAction()) {
+                               return 0;
+                       }
+                       return o1.getAction() == StateTransitionAction.IGNORE ? 
1 : -1;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index de7daea..b6d57cd 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -25,19 +25,25 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.state.StateInitializationContext;
+import 
org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import 
org.apache.flink.streaming.api.operators.InternalWatermarkCallbackService;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OnWatermarkCallback;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.Objects;
 import java.util.PriorityQueue;
@@ -55,7 +61,7 @@ import java.util.PriorityQueue;
  */
 public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
        extends AbstractStreamOperator<OUT>
-       implements OneInputStreamOperator<IN, OUT> {
+       implements OneInputStreamOperator<IN, OUT>, 
CheckpointedRestoringOperator {
 
        private static final long serialVersionUID = -4166778210774160757L;
 
@@ -82,18 +88,26 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
        private final PriorityQueueFactory<StreamRecord<IN>> 
priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
        private final NFACompiler.NFAFactory<IN> nfaFactory;
 
+       /**
+        * A flag used in the case of migration that indicates if
+        * we are restoring from an old keyed or non-keyed operator.
+        */
+       private final boolean migratingFromOldKeyedOperator;
+
        public AbstractKeyedCEPPatternOperator(
                        final TypeSerializer<IN> inputSerializer,
                        final boolean isProcessingTime,
                        final KeySelector<IN, KEY> keySelector,
                        final TypeSerializer<KEY> keySerializer,
-                       final NFACompiler.NFAFactory<IN> nfaFactory) {
+                       final NFACompiler.NFAFactory<IN> nfaFactory,
+                       final boolean migratingFromOldKeyedOperator) {
 
                this.inputSerializer = 
Preconditions.checkNotNull(inputSerializer);
                this.isProcessingTime = 
Preconditions.checkNotNull(isProcessingTime);
                this.keySelector = Preconditions.checkNotNull(keySelector);
                this.keySerializer = Preconditions.checkNotNull(keySerializer);
                this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
+               this.migratingFromOldKeyedOperator = 
migratingFromOldKeyedOperator;
        }
 
        public TypeSerializer<IN> getInputSerializer() {
@@ -104,15 +118,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
        public void initializeState(StateInitializationContext context) throws 
Exception {
                super.initializeState(context);
 
-               // we have to call initializeState here and in the migration 
restore()
-               // method because the restore() (from legacy) is called before 
the
-               // initializeState().
-
-               initializeState();
-       }
-
-       private void initializeState() {
-
                if (nfaOperatorState == null) {
                        nfaOperatorState = getRuntimeContext().getState(
                                new ValueStateDescriptor<>(
@@ -252,6 +257,57 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
         */
        protected abstract void advanceTime(NFA<IN> nfa, long timestamp);
 
+       //////////////////////                  Backwards Compatibility         
        //////////////////////
+
+       @Override
+       public void restoreState(FSDataInputStream in) throws Exception {
+               // this is the flag indicating if we have udf
+               // state to restore (not needed here)
+               in.read();
+
+               DataInputViewStreamWrapper inputView = new 
DataInputViewStreamWrapper(in);
+               InternalWatermarkCallbackService<KEY> watermarkCallbackService 
= getInternalWatermarkCallbackService();
+
+               if (migratingFromOldKeyedOperator) {
+                       int numberEntries = inputView.readInt();
+                       for (int i = 0; i <numberEntries; i++) {
+                               
watermarkCallbackService.registerKeyForWatermarkCallback(keySerializer.deserialize(inputView));
+                       }
+               } else {
+
+                       final ObjectInputStream ois = new ObjectInputStream(in);
+
+                       // retrieve the NFA
+                       @SuppressWarnings("unchecked")
+                       NFA<IN> nfa = (NFA<IN>) ois.readObject();
+
+                       // retrieve the elements that were pending in the 
priority queue
+                       MultiplexingStreamRecordSerializer<IN> recordSerializer 
= new MultiplexingStreamRecordSerializer<>(inputSerializer);
+                       PriorityQueue<StreamRecord<IN>> priorityQueue = 
priorityQueueFactory.createPriorityQueue();
+                       int entries = ois.readInt();
+                       for (int i = 0; i < entries; i++) {
+                               StreamElement streamElement = 
recordSerializer.deserialize(inputView);
+                               
priorityQueue.offer(streamElement.<IN>asRecord());
+                       }
+
+                       // finally register the retrieved state with the new 
keyed state.
+                       setCurrentKey((byte) 0);
+                       nfaOperatorState.update(nfa);
+                       priorityQueueOperatorState.update(priorityQueue);
+
+                       if (!isProcessingTime) {
+                               // this is relevant only for event/ingestion 
time
+
+                               // need to work around type restrictions
+                               InternalWatermarkCallbackService 
rawWatermarkCallbackService =
+                                       (InternalWatermarkCallbackService) 
watermarkCallbackService;
+
+                               
rawWatermarkCallbackService.registerKeyForWatermarkCallback((byte) 0);
+                       }
+                       ois.close();
+               }
+       }
+
        //////////////////////                  Utility Classes                 
//////////////////////
 
        /**
@@ -348,6 +404,11 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                }
 
                @Override
+               public boolean isCompatibleWith(TypeSerializer<?> other) {
+                       return equals(other) || other instanceof 
AbstractKeyedCEPPatternOperator.PriorityQueueSerializer;
+               }
+
+               @Override
                public boolean equals(Object obj) {
                        if (obj instanceof PriorityQueueSerializer) {
                                @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index 56ecb17..a5eef45 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -72,7 +72,8 @@ public class CEPOperatorUtils {
                                        isProcessingTime,
                                        keySelector,
                                        keySerializer,
-                                       nfaFactory));
+                                       nfaFactory,
+                                       true));
                } else {
 
                        KeySelector<T, Byte> keySelector = new 
NullByteKeySelector<>();
@@ -86,7 +87,8 @@ public class CEPOperatorUtils {
                                        isProcessingTime,
                                        keySelector,
                                        keySerializer,
-                                       nfaFactory
+                                       nfaFactory,
+                                       false
                                )).forceNonParallel();
                }
 
@@ -133,7 +135,8 @@ public class CEPOperatorUtils {
                                        isProcessingTime,
                                        keySelector,
                                        keySerializer,
-                                       nfaFactory));
+                                       nfaFactory,
+                                       true));
                } else {
 
                        KeySelector<T, Byte> keySelector = new 
NullByteKeySelector<>();
@@ -147,7 +150,8 @@ public class CEPOperatorUtils {
                                        isProcessingTime,
                                        keySelector,
                                        keySerializer,
-                                       nfaFactory
+                                       nfaFactory,
+                                       false
                                )).forceNonParallel();
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 5b6ffe2..21cee23 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -45,9 +45,10 @@ public class KeyedCEPPatternOperator<IN, KEY> extends 
AbstractKeyedCEPPatternOpe
                        boolean isProcessingTime,
                        KeySelector<IN, KEY> keySelector,
                        TypeSerializer<KEY> keySerializer,
-                       NFACompiler.NFAFactory<IN> nfaFactory) {
+                       NFACompiler.NFAFactory<IN> nfaFactory,
+                       boolean migratingFromOldKeyedOperator) {
 
-               super(inputSerializer, isProcessingTime, keySelector, 
keySerializer, nfaFactory);
+               super(inputSerializer, isProcessingTime, keySelector, 
keySerializer, nfaFactory, migratingFromOldKeyedOperator);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index 6889bb9..c6fba55 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -45,9 +45,10 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends 
AbstractKeyedCEPPat
                        boolean isProcessingTime,
                        KeySelector<IN, KEY> keySelector,
                        TypeSerializer<KEY> keySerializer,
-                       NFACompiler.NFAFactory<IN> nfaFactory) {
+                       NFACompiler.NFAFactory<IN> nfaFactory,
+                       boolean migratingFromOldKeyedOperator) {
 
-               super(inputSerializer, isProcessingTime, keySelector, 
keySerializer, nfaFactory);
+               super(inputSerializer, isProcessingTime, keySelector, 
keySerializer, nfaFactory, migratingFromOldKeyedOperator);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
new file mode 100644
index 0000000..5a3e623
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.NullByteKeySelector;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CEPMigration11to13Test {
+
+       private static String getResourceFilename(String filename) {
+               ClassLoader cl = CEPMigration11to13Test.class.getClassLoader();
+               URL resource = cl.getResource(filename);
+               if (resource == null) {
+                       throw new NullPointerException("Missing snapshot 
resource.");
+               }
+               return resource.getFile();
+       }
+
+       @Test
+       public void testKeyedCEPOperatorMigratation() throws Exception {
+
+               KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
+                       private static final long serialVersionUID = 
-4873366487571254798L;
+
+                       @Override
+                       public Integer getKey(Event value) throws Exception {
+                               return value.getId();
+                       }
+               };
+
+               final Event startEvent = new Event(42, "start", 1.0);
+               final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+               final Event endEvent = new Event(42, "end", 1.0);
+
+               // uncomment these lines for regenerating the snapshot on Flink 
1.1
+               /*
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = new OneInputStreamOperatorTestHarness<>(
+                               new KeyedCEPPatternOperator<>(
+                                               Event.createTypeSerializer(),
+                                               false,
+                                               keySelector,
+                                               IntSerializer.INSTANCE,
+                                               new NFAFactory()));
+               harness.configureForKeyedStream(keySelector, 
BasicTypeInfo.INT_TYPE_INFO);
+               harness.open();
+               harness.processElement(new StreamRecord<Event>(startEvent, 1));
+               harness.processElement(new StreamRecord<Event>(new Event(42, 
"foobar", 1.0), 2));
+               harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3));
+               harness.processWatermark(new Watermark(2));
+               // simulate snapshot/restore with empty element queue but NFA 
state
+               StreamTaskState snapshot = harness.snapshot(1, 1);
+               FileOutputStream out = new FileOutputStream(
+                               "src/test/resources/cep-keyed-snapshot-1.1");
+               ObjectOutputStream oos = new ObjectOutputStream(out);
+               oos.writeObject(snapshot);
+               out.close();
+               harness.close();
+               */
+
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+                               new KeyedOneInputStreamOperatorTestHarness<>(
+                                               new KeyedCEPPatternOperator<>(
+                                                               
Event.createTypeSerializer(),
+                                                               false,
+                                                               keySelector,
+                                                               
IntSerializer.INSTANCE,
+                                                               new 
NFAFactory(),
+                                                               true),
+                                               keySelector,
+                                               BasicTypeInfo.INT_TYPE_INFO);
+
+               harness.setup();
+               
harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-snapshot-1.1"));
+               harness.open();
+
+               harness.processElement(new StreamRecord<Event>(middleEvent, 3));
+               harness.processElement(new StreamRecord<>(new Event(42, 
"start", 1.0), 4));
+               harness.processElement(new StreamRecord<>(endEvent, 5));
+
+               harness.processWatermark(new Watermark(20));
+
+               ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+               // watermark and the result
+               assertEquals(2, result.size());
+
+               Object resultObject = result.poll();
+               assertTrue(resultObject instanceof StreamRecord);
+               StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+               assertTrue(resultRecord.getValue() instanceof Map);
+
+               @SuppressWarnings("unchecked")
+               Map<String, Event> patternMap = (Map<String, Event>) 
resultRecord.getValue();
+
+               assertEquals(startEvent, patternMap.get("start"));
+               assertEquals(middleEvent, patternMap.get("middle"));
+               assertEquals(endEvent, patternMap.get("end"));
+
+               harness.close();
+       }
+
+       @Test
+       public void testNonKeyedCEPFunctionMigration() throws Exception {
+
+               final Event startEvent = new Event(42, "start", 1.0);
+               final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+               final Event endEvent=  new Event(42, "end", 1.0);
+
+               // uncomment these lines for regenerating the snapshot on Flink 
1.1
+               /*
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = new OneInputStreamOperatorTestHarness<>(
+                               new CEPPatternOperator<>(
+                                               Event.createTypeSerializer(),
+                                               false,
+                                               new NFAFactory()));
+               harness.open();
+               harness.processElement(new StreamRecord<Event>(startEvent, 1));
+               harness.processElement(new StreamRecord<Event>(new Event(42, 
"foobar", 1.0), 2));
+               harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3));
+               harness.processWatermark(new Watermark(2));
+
+               // simulate snapshot/restore with empty element queue but NFA 
state
+               StreamTaskState snapshot = harness.snapshot(1, 1);
+               FileOutputStream out = new FileOutputStream(
+                               
"src/test/resources/cep-non-keyed-snapshot-1.1");
+               ObjectOutputStream oos = new ObjectOutputStream(out);
+               oos.writeObject(snapshot);
+               out.close();
+               harness.close();
+               */
+
+               NullByteKeySelector keySelector = new NullByteKeySelector();
+
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+                               new KeyedOneInputStreamOperatorTestHarness<>(
+                                               new KeyedCEPPatternOperator<>(
+                                                               
Event.createTypeSerializer(),
+                                                               false,
+                                                               keySelector,
+                                                               
ByteSerializer.INSTANCE,
+                                                               new 
NFAFactory(),
+                                                               false),
+                                               keySelector,
+                                               BasicTypeInfo.BYTE_TYPE_INFO);
+
+               harness.setup();
+               
harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-snapshot-1.1"));
+               harness.open();
+
+               harness.processElement(new StreamRecord<Event>(middleEvent, 3));
+               harness.processElement(new StreamRecord<>(new Event(42, 
"start", 1.0), 4));
+               harness.processElement(new StreamRecord<>(endEvent, 5));
+
+               harness.processWatermark(new Watermark(Long.MAX_VALUE));
+
+               ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+               // watermark and the result
+               assertEquals(2, result.size());
+
+               Object resultObject = result.poll();
+               assertTrue(resultObject instanceof StreamRecord);
+               StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+               assertTrue(resultRecord.getValue() instanceof Map);
+
+               @SuppressWarnings("unchecked")
+               Map<String, Event> patternMap = (Map<String, Event>) 
resultRecord.getValue();
+
+               assertEquals(startEvent, patternMap.get("start"));
+               assertEquals(middleEvent, patternMap.get("middle"));
+               assertEquals(endEvent, patternMap.get("end"));
+
+               harness.close();
+       }
+
+       private static class NFAFactory implements 
NFACompiler.NFAFactory<Event> {
+
+               private static final long serialVersionUID = 
1173020762472766713L;
+
+               private final boolean handleTimeout;
+
+               private NFAFactory() {
+                       this(false);
+               }
+
+               private NFAFactory(boolean handleTimeout) {
+                       this.handleTimeout = handleTimeout;
+               }
+
+               @Override
+               public NFA<Event> createNFA() {
+
+                       Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new StartFilter())
+                                       
.followedBy("middle").subtype(SubEvent.class).where(new MiddleFilter())
+                                       .followedBy("end").where(new 
EndFilter())
+                                       // add a window timeout to test whether 
timestamps of elements in the
+                                       // priority queue in CEP operator are 
correctly checkpointed/restored
+                                       .within(Time.milliseconds(10L));
+
+                       return NFACompiler.compile(pattern, 
Event.createTypeSerializer(), handleTimeout);
+               }
+       }
+
+       private static class StartFilter implements FilterFunction<Event> {
+               private static final long serialVersionUID = 
5726188262756267490L;
+
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("start");
+               }
+       }
+
+       private static class MiddleFilter implements FilterFunction<SubEvent> {
+               private static final long serialVersionUID = 
6215754202506583964L;
+
+               @Override
+               public boolean filter(SubEvent value) throws Exception {
+                       return value.getVolume() > 5.0;
+               }
+       }
+
+       private static class EndFilter implements FilterFunction<Event> {
+               private static final long serialVersionUID = 
7056763917392056548L;
+
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("end");
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 4ae74b9..a99db05 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -224,7 +224,8 @@ public class CEPOperatorTest extends TestLogger {
                                false,
                                keySelector,
                                IntSerializer.INSTANCE,
-                               new NFAFactory(true)),
+                               new NFAFactory(true),
+                               true),
                        keySelector,
                        BasicTypeInfo.INT_TYPE_INFO);
 
@@ -480,7 +481,8 @@ public class CEPOperatorTest extends TestLogger {
                        isProcessingTime,
                        keySelector,
                        IntSerializer.INSTANCE,
-                       new NFAFactory());
+                       new NFAFactory(),
+                       true);
        }
 
        private static class TestKeySelector implements KeySelector<Event, 
Integer> {

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 78765c0..399662a 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -345,7 +345,8 @@ public class CEPRescalingTest {
                                false,
                                keySelector,
                                
BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               new NFAFactory()),
+                               new NFAFactory(),
+                               true),
                        keySelector,
                        BasicTypeInfo.INT_TYPE_INFO,
                        maxParallelism,

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1 
b/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1
new file mode 100644
index 0000000..277de1d
Binary files /dev/null and 
b/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1 
b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1
new file mode 100644
index 0000000..b5ca51e
Binary files /dev/null and 
b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
new file mode 100644
index 0000000..a0f5a60
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.streaming.runtime.streamrecord;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+
+import static java.util.Objects.requireNonNull;
+
+public class MultiplexingStreamRecordSerializer<T> extends 
TypeSerializer<StreamElement> {
+
+
+       private static final long serialVersionUID = 1L;
+
+       private static final int TAG_REC_WITH_TIMESTAMP = 0;
+       private static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
+       private static final int TAG_WATERMARK = 2;
+
+
+       private final TypeSerializer<T> typeSerializer;
+
+
+       public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) 
{
+               if (serializer instanceof MultiplexingStreamRecordSerializer || 
serializer instanceof StreamRecordSerializer) {
+                       throw new RuntimeException("StreamRecordSerializer 
given to StreamRecordSerializer as value TypeSerializer: " + serializer);
+               }
+               this.typeSerializer = requireNonNull(serializer);
+       }
+
+       public TypeSerializer<T> getContainedTypeSerializer() {
+               return this.typeSerializer;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public MultiplexingStreamRecordSerializer<T> duplicate() {
+               TypeSerializer<T> copy = typeSerializer.duplicate();
+               return (copy == typeSerializer) ? this : new 
MultiplexingStreamRecordSerializer<T>(copy);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public StreamRecord<T> createInstance() {
+               return new StreamRecord<T>(typeSerializer.createInstance());
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public StreamElement copy(StreamElement from) {
+               // we can reuse the timestamp since Instant is immutable
+               if (from.isRecord()) {
+                       StreamRecord<T> fromRecord = from.asRecord();
+                       return 
fromRecord.copy(typeSerializer.copy(fromRecord.getValue()));
+               }
+               else if (from.isWatermark()) {
+                       // is immutable
+                       return from;
+               }
+               else {
+                       throw new RuntimeException();
+               }
+       }
+
+       @Override
+       public StreamElement copy(StreamElement from, StreamElement reuse) {
+               if (from.isRecord() && reuse.isRecord()) {
+                       StreamRecord<T> fromRecord = from.asRecord();
+                       StreamRecord<T> reuseRecord = reuse.asRecord();
+
+                       T valueCopy = 
typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue());
+                       fromRecord.copyTo(valueCopy, reuseRecord);
+                       return reuse;
+               }
+               else if (from.isWatermark()) {
+                       // is immutable
+                       return from;
+               }
+               else {
+                       throw new RuntimeException("Cannot copy " + from + " -> 
" + reuse);
+               }
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               int tag = source.readByte();
+               target.write(tag);
+
+               if (tag == TAG_REC_WITH_TIMESTAMP) {
+                       // move timestamp
+                       target.writeLong(source.readLong());
+                       typeSerializer.copy(source, target);
+               }
+               else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+                       typeSerializer.copy(source, target);
+               }
+               else if (tag == TAG_WATERMARK) {
+                       target.writeLong(source.readLong());
+               }
+               else {
+                       throw new IOException("Corrupt stream, found tag: " + 
tag);
+               }
+       }
+
+       @Override
+       public void serialize(StreamElement value, DataOutputView target) 
throws IOException {
+               if (value.isRecord()) {
+                       StreamRecord<T> record = value.asRecord();
+
+                       if (record.hasTimestamp()) {
+                               target.write(TAG_REC_WITH_TIMESTAMP);
+                               target.writeLong(record.getTimestamp());
+                       } else {
+                               target.write(TAG_REC_WITHOUT_TIMESTAMP);
+                       }
+                       typeSerializer.serialize(record.getValue(), target);
+               }
+               else if (value.isWatermark()) {
+                       target.write(TAG_WATERMARK);
+                       target.writeLong(value.asWatermark().getTimestamp());
+               }
+               else {
+                       throw new RuntimeException();
+               }
+       }
+
+       @Override
+       public StreamElement deserialize(DataInputView source) throws 
IOException {
+               int tag = source.readByte();
+               if (tag == TAG_REC_WITH_TIMESTAMP) {
+                       long timestamp = source.readLong();
+                       return new 
StreamRecord<T>(typeSerializer.deserialize(source), timestamp);
+               }
+               else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+                       return new 
StreamRecord<T>(typeSerializer.deserialize(source));
+               }
+               else if (tag == TAG_WATERMARK) {
+                       return new Watermark(source.readLong());
+               }
+               else {
+                       throw new IOException("Corrupt stream, found tag: " + 
tag);
+               }
+       }
+
+       @Override
+       public StreamElement deserialize(StreamElement reuse, DataInputView 
source) throws IOException {
+               int tag = source.readByte();
+               if (tag == TAG_REC_WITH_TIMESTAMP) {
+                       long timestamp = source.readLong();
+                       T value = typeSerializer.deserialize(source);
+                       StreamRecord<T> reuseRecord = reuse.asRecord();
+                       reuseRecord.replace(value, timestamp);
+                       return reuseRecord;
+               }
+               else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+                       T value = typeSerializer.deserialize(source);
+                       StreamRecord<T> reuseRecord = reuse.asRecord();
+                       reuseRecord.replace(value);
+                       return reuseRecord;
+               }
+               else if (tag == TAG_WATERMARK) {
+                       return new Watermark(source.readLong());
+               }
+               else {
+                       throw new IOException("Corrupt stream, found tag: " + 
tag);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof MultiplexingStreamRecordSerializer) {
+                       MultiplexingStreamRecordSerializer<?> other = 
(MultiplexingStreamRecordSerializer<?>) obj;
+
+                       return other.canEqual(this) && 
typeSerializer.equals(other.typeSerializer);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof MultiplexingStreamRecordSerializer;
+       }
+
+       @Override
+       public int hashCode() {
+               return typeSerializer.hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
new file mode 100644
index 0000000..0235ab8
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.streaming.runtime.streamrecord;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Serializer for {@link StreamRecord}. This version ignores timestamps and 
only deals with
+ * the element.
+ *
+ * <p>
+ * {@link MultiplexingStreamRecordSerializer} is a version that deals with 
timestamps and also
+ * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark 
Watermarks} in the same
+ * stream with {@link StreamRecord StreamRecords}.
+ *
+ * @see MultiplexingStreamRecordSerializer
+ *
+ * @param <T> The type of value in the {@link StreamRecord}
+ */
+@Internal
+public final class StreamRecordSerializer<T> extends 
TypeSerializer<StreamRecord<T>> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final TypeSerializer<T> typeSerializer;
+       
+
+       public StreamRecordSerializer(TypeSerializer<T> serializer) {
+               if (serializer instanceof StreamRecordSerializer) {
+                       throw new RuntimeException("StreamRecordSerializer 
given to StreamRecordSerializer as value TypeSerializer: " + serializer);
+               }
+               this.typeSerializer = Preconditions.checkNotNull(serializer);
+       }
+
+       public TypeSerializer<T> getContainedTypeSerializer() {
+               return this.typeSerializer;
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  General serializer and type utils
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public StreamRecordSerializer<T> duplicate() {
+               TypeSerializer<T> serializerCopy = typeSerializer.duplicate();
+               return serializerCopy == typeSerializer ? this : new 
StreamRecordSerializer<T>(serializerCopy);
+       }
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public int getLength() {
+               return typeSerializer.getLength();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Type serialization, copying, instantiation
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public StreamRecord<T> createInstance() {
+               try {
+                       return new 
StreamRecord<T>(typeSerializer.createInstance());
+               } catch (Exception e) {
+                       throw new RuntimeException("Cannot instantiate 
StreamRecord.", e);
+               }
+       }
+       
+       @Override
+       public StreamRecord<T> copy(StreamRecord<T> from) {
+               return from.copy(typeSerializer.copy(from.getValue()));
+       }
+
+       @Override
+       public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> 
reuse) {
+               from.copyTo(typeSerializer.copy(from.getValue(), 
reuse.getValue()), reuse);
+               return reuse;
+       }
+
+       @Override
+       public void serialize(StreamRecord<T> value, DataOutputView target) 
throws IOException {
+               typeSerializer.serialize(value.getValue(), target);
+       }
+       
+       @Override
+       public StreamRecord<T> deserialize(DataInputView source) throws 
IOException {
+               return new StreamRecord<T>(typeSerializer.deserialize(source));
+       }
+
+       @Override
+       public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView 
source) throws IOException {
+               T element = typeSerializer.deserialize(reuse.getValue(), 
source);
+               reuse.replace(element);
+               return reuse;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               typeSerializer.copy(source, target);
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof StreamRecordSerializer) {
+                       StreamRecordSerializer<?> other = 
(StreamRecordSerializer<?>) obj;
+
+                       return other.canEqual(this) && 
typeSerializer.equals(other.typeSerializer);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof StreamRecordSerializer;
+       }
+
+       @Override
+       public int hashCode() {
+               return typeSerializer.hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 661a72d..538aa4b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -911,6 +911,8 @@ under the License.
                                                
<exclude>flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot</exclude>
                                                
<exclude>flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint</exclude>
                                                
<exclude>flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb</exclude>
+                                               
<exclude>flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1</exclude>
+                                               
<exclude>flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1</exclude>
 
                                                <!-- TweetInputFormat Test 
Data-->
                                                
<exclude>flink-contrib/flink-tweet-inputformat/src/main/resources/HashTagTweetSample.json</exclude>

Reply via email to