Repository: flink
Updated Branches:
  refs/heads/master dd8ef550c -> 15ae922ad


[FLINK-5845] [cep] Unify keyed and non-keyed operators.

Now all cep operators are keyed, and for the non-keyed
usecases, we key on a dummy key and use the keyed operator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/15ae922a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/15ae922a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/15ae922a

Branch: refs/heads/master
Commit: 15ae922ad4151701cbb4e0df207f43d0094366d1
Parents: dd8ef55
Author: kl0u <[email protected]>
Authored: Thu Feb 16 12:02:25 2017 +0100
Committer: kl0u <[email protected]>
Committed: Fri Feb 24 11:10:40 2017 +0100

----------------------------------------------------------------------
 .../flink/api/java/functions/KeySelector.java   |   2 +-
 .../api/java/functions/NullByteKeySelector.java |  39 ++++
 .../AbstractCEPBasePatternOperator.java         | 108 ------------
 .../operator/AbstractCEPPatternOperator.java    | 142 ---------------
 .../AbstractKeyedCEPPatternOperator.java        | 128 ++++++++++----
 .../flink/cep/operator/CEPOperatorUtils.java    |  23 ++-
 .../flink/cep/operator/CEPPatternOperator.java  |  76 --------
 .../cep/operator/KeyedCEPPatternOperator.java   |  22 +--
 .../cep/operator/TimeoutCEPPatternOperator.java |  94 ----------
 .../TimeoutKeyedCEPPatternOperator.java         |  43 ++---
 .../flink/cep/operator/CEPOperatorTest.java     | 176 -------------------
 .../api/datastream/AllWindowedStream.java       |  14 +-
 12 files changed, 186 insertions(+), 681 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java 
b/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
index d96f078..da3b3e2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
@@ -34,7 +34,7 @@ import java.io.Serializable;
  */
 @Public
 public interface KeySelector<IN, KEY> extends Function, Serializable {
-       
+
        /**
         * User-defined function that extracts the key from an arbitrary object.
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-core/src/main/java/org/apache/flink/api/java/functions/NullByteKeySelector.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/functions/NullByteKeySelector.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/functions/NullByteKeySelector.java
new file mode 100644
index 0000000..4aa533d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/functions/NullByteKeySelector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Used as a dummy {@link KeySelector} to allow using keyed operators
+ * for non-keyed usecases. Essentially, it gives all incoming records
+ * the same key, which is a {@code (byte) 0} value.
+ *
+ * @param <T> The type of the input element.
+ */
+@Internal
+public class NullByteKeySelector<T> implements KeySelector<T, Byte> {
+
+       private static final long serialVersionUID = 614256539098549020L;
+
+       @Override
+       public Byte getKey(T value) throws Exception {
+               return 0;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
deleted file mode 100644
index a3497a6..0000000
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.IOException;
-import java.util.PriorityQueue;
-
-/**
- * Base class for CEP pattern operator. The operator uses a {@link NFA} to 
detect complex event
- * patterns. The detected event patterns are then outputted to the down stream 
operators.
- *
- * @param <IN> Type of the input elements
- * @param <OUT> Type fo the output elements
- */
-public abstract class AbstractCEPBasePatternOperator<IN, OUT>
-       extends AbstractStreamOperator<OUT>
-       implements OneInputStreamOperator<IN, OUT>, StreamCheckpointedOperator {
-
-       private static final long serialVersionUID = -4166778210774160757L;
-
-       protected static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
-
-       private final TypeSerializer<IN> inputSerializer;
-       private final boolean isProcessingTime;
-
-       public AbstractCEPBasePatternOperator(
-                       final TypeSerializer<IN> inputSerializer,
-                       final boolean isProcessingTime) {
-               this.inputSerializer = inputSerializer;
-               this.isProcessingTime = isProcessingTime;
-       }
-
-       public TypeSerializer<IN> getInputSerializer() {
-               return inputSerializer;
-       }
-
-       protected abstract NFA<IN> getNFA() throws IOException;
-
-       protected abstract void updateNFA(NFA<IN> nfa) throws IOException;
-
-       protected abstract PriorityQueue<StreamRecord<IN>> getPriorityQueue() 
throws IOException;
-
-       protected abstract void 
updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException;
-
-       @Override
-       public void processElement(StreamRecord<IN> element) throws Exception {
-               if (isProcessingTime) {
-                       // there can be no out of order elements in processing 
time
-                       NFA<IN> nfa = getNFA();
-                       processEvent(nfa, element.getValue(), 
System.currentTimeMillis());
-                       updateNFA(nfa);
-               } else {
-                       PriorityQueue<StreamRecord<IN>> priorityQueue = 
getPriorityQueue();
-
-                       // event time processing
-                       // we have to buffer the elements until we receive the 
proper watermark
-                       if (getExecutionConfig().isObjectReuseEnabled()) {
-                               // copy the StreamRecord so that it cannot be 
changed
-                               priorityQueue.offer(new 
StreamRecord<IN>(inputSerializer.copy(element.getValue()), 
element.getTimestamp()));
-                       } else {
-                               priorityQueue.offer(element);
-                       }
-                       updatePriorityQueue(priorityQueue);
-               }
-       }
-
-       /**
-        * Process the given event by giving it to the NFA and outputting the 
produced set of matched
-        * event sequences.
-        *
-        * @param nfa NFA to be used for the event detection
-        * @param event The current event to be processed
-        * @param timestamp The timestamp of the event
-        */
-       protected abstract void processEvent(NFA<IN> nfa, IN event, long 
timestamp);
-
-       /**
-        * Advances the time for the given NFA to the given timestamp. This can 
lead to pruning and
-        * timeouts.
-        *
-        * @param nfa to advance the time for
-        * @param timestamp to advance the time to
-        */
-       protected abstract void advanceTime(NFA<IN> nfa, long timestamp);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
deleted file mode 100644
index fe9aced..0000000
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.PriorityQueue;
-
-/**
- * Abstract CEP pattern operator which is used for non keyed streams. 
Consequently,
- * the operator state only includes a single {@link NFA} and a priority queue 
to order out of order
- * elements in case of event time processing.
- *
- * @param <IN> Type of the input elements
- * @param <OUT> Type of the output elements
- */
-abstract public class AbstractCEPPatternOperator<IN, OUT> extends 
AbstractCEPBasePatternOperator<IN, OUT> {
-       private static final long serialVersionUID = 7487334510746595640L;
-
-       private final StreamElementSerializer<IN> streamRecordSerializer;
-
-       // global nfa for all elements
-       private NFA<IN> nfa;
-
-       // queue to buffer out of order stream records
-       private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
-
-       public AbstractCEPPatternOperator(
-                       TypeSerializer<IN> inputSerializer,
-                       boolean isProcessingTime,
-                       NFACompiler.NFAFactory<IN> nfaFactory) {
-               super(inputSerializer, isProcessingTime);
-
-               this.streamRecordSerializer = new 
StreamElementSerializer<>(inputSerializer);
-               this.nfa = nfaFactory.createNFA();
-       }
-
-       @Override
-       public void open() throws Exception {
-               super.open();
-               if (priorityQueue == null) {
-                       priorityQueue = new 
PriorityQueue<StreamRecord<IN>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new 
StreamRecordComparator<IN>());
-               }
-       }
-
-       @Override
-       protected NFA<IN> getNFA() throws IOException {
-               return nfa;
-       }
-
-       @Override
-       protected void updateNFA(NFA<IN> nfa) {
-               // a no-op, because we only have one NFA
-       }
-
-       @Override
-       protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws 
IOException {
-               return priorityQueue;
-       }
-
-       @Override
-       protected void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> 
queue) {
-               // a no-op, because we only have one priority queue
-       }
-
-       @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               // we do our own watermark handling, no super call. we will 
never be able to use
-               // the timer service like this, however.
-
-               if (priorityQueue.isEmpty()) {
-                       advanceTime(nfa, mark.getTimestamp());
-               } else {
-                       while (!priorityQueue.isEmpty() && 
priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
-                               StreamRecord<IN> streamRecord = 
priorityQueue.poll();
-
-                               processEvent(nfa, streamRecord.getValue(), 
streamRecord.getTimestamp());
-                       }
-               }
-
-               output.emitWatermark(mark);
-       }
-
-       @Override
-       public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
-               final ObjectOutputStream oos = new ObjectOutputStream(out);
-
-               oos.writeObject(nfa);
-               oos.writeInt(priorityQueue.size());
-
-               for (StreamRecord<IN> streamRecord: priorityQueue) {
-                       streamRecordSerializer.serialize(streamRecord, new 
DataOutputViewStreamWrapper(oos));
-               }
-               oos.flush();
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public void restoreState(FSDataInputStream state) throws Exception {
-               final ObjectInputStream ois = new ObjectInputStream(state);
-
-               nfa = (NFA<IN>)ois.readObject();
-
-               int numberPriorityQueueEntries = ois.readInt();
-
-               priorityQueue = new 
PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new 
StreamRecordComparator<IN>());
-
-               for (int i = 0; i <numberPriorityQueueEntries; i++) {
-                       StreamElement streamElement = 
streamRecordSerializer.deserialize(new DataInputViewStreamWrapper(ois));
-                       priorityQueue.offer(streamElement.<IN>asRecord());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 832a0ba..90ee846 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -30,9 +30,13 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -44,7 +48,7 @@ import java.util.Set;
 /**
  * Abstract CEP pattern operator for a keyed input stream. For each key, the 
operator creates
  * a {@link NFA} and a priority queue to buffer out of order elements. Both 
data structures are
- * stored using the key value state. Additionally, the set of all seen keys is 
kept as part of the
+ * stored using the managed keyed state. Additionally, the set of all seen 
keys is kept as part of the
  * operator state. This is necessary to trigger the execution for all keys 
upon receiving a new
  * watermark.
  *
@@ -52,11 +56,17 @@ import java.util.Set;
  * @param <KEY> Type of the key on which the input stream is keyed
  * @param <OUT> Type of the output elements
  */
-abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends 
AbstractCEPBasePatternOperator<IN, OUT> {
-       private static final long serialVersionUID = -7234999752950159178L;
+public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
+       extends AbstractStreamOperator<OUT>
+       implements OneInputStreamOperator<IN, OUT>, StreamCheckpointedOperator {
 
-       private static final String NFA_OPERATOR_STATE_NAME = 
"nfaOperatorState";
-       private static final String PRIORIRY_QUEUE_STATE_NAME = 
"priorityQueueStateName";
+       private static final long serialVersionUID = -4166778210774160757L;
+
+       private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+
+       private final boolean isProcessingTime;
+
+       private final TypeSerializer<IN> inputSerializer;
 
        // necessary to extract the key from the input elements
        private final KeySelector<IN, KEY> keySelector;
@@ -64,29 +74,38 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
        // necessary to serialize the set of seen keys
        private final TypeSerializer<KEY> keySerializer;
 
-       private final PriorityQueueFactory<StreamRecord<IN>> 
priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
-       private final NFACompiler.NFAFactory<IN> nfaFactory;
+       ///////////////                 State                   //////////////
 
        // stores the keys we've already seen to trigger execution upon 
receiving a watermark
        // this can be problematic, since it is never cleared
        // TODO: fix once the state refactoring is completed
        private transient Set<KEY> keys;
 
+       private static final String NFA_OPERATOR_STATE_NAME = 
"nfaOperatorState";
+       private static final String PRIORITY_QUEUE_STATE_NAME = 
"priorityQueueStateName";
+
        private transient ValueState<NFA<IN>> nfaOperatorState;
        private transient ValueState<PriorityQueue<StreamRecord<IN>>> 
priorityQueueOperatorState;
 
-       public AbstractKeyedCEPPatternOperator(
-                       TypeSerializer<IN> inputSerializer,
-                       boolean isProcessingTime,
-                       KeySelector<IN, KEY> keySelector,
-                       TypeSerializer<KEY> keySerializer,
-                       NFACompiler.NFAFactory<IN> nfaFactory) {
-               super(inputSerializer, isProcessingTime);
+       private final PriorityQueueFactory<StreamRecord<IN>> 
priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
+       private final NFACompiler.NFAFactory<IN> nfaFactory;
 
-               this.keySelector = keySelector;
-               this.keySerializer = keySerializer;
+       public AbstractKeyedCEPPatternOperator(
+                       final TypeSerializer<IN> inputSerializer,
+                       final boolean isProcessingTime,
+                       final KeySelector<IN, KEY> keySelector,
+                       final TypeSerializer<KEY> keySerializer,
+                       final NFACompiler.NFAFactory<IN> nfaFactory) {
+
+               this.inputSerializer = 
Preconditions.checkNotNull(inputSerializer);
+               this.isProcessingTime = 
Preconditions.checkNotNull(isProcessingTime);
+               this.keySelector = Preconditions.checkNotNull(keySelector);
+               this.keySerializer = Preconditions.checkNotNull(keySerializer);
+               this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
+       }
 
-               this.nfaFactory = nfaFactory;
+       public TypeSerializer<IN> getInputSerializer() {
+               return inputSerializer;
        }
 
        @Override
@@ -100,27 +119,27 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
 
                if (nfaOperatorState == null) {
                        nfaOperatorState = getPartitionedState(
-                                       new ValueStateDescriptor<NFA<IN>>(
-                                               NFA_OPERATOR_STATE_NAME,
-                                               new NFA.Serializer<IN>()));
+                               new 
ValueStateDescriptor<>(NFA_OPERATOR_STATE_NAME, new NFA.Serializer<IN>()));
                }
 
                @SuppressWarnings("unchecked,rawtypes")
                TypeSerializer<StreamRecord<IN>> streamRecordSerializer =
-                               (TypeSerializer) new 
StreamElementSerializer<>(getInputSerializer());
+                       (TypeSerializer) new 
StreamElementSerializer<>(getInputSerializer());
 
                if (priorityQueueOperatorState == null) {
                        priorityQueueOperatorState = getPartitionedState(
-                                       new ValueStateDescriptor<>(
-                                               PRIORIRY_QUEUE_STATE_NAME,
-                                               new PriorityQueueSerializer<>(
-                                                               
streamRecordSerializer,
-                                                               new 
PriorityQueueStreamRecordFactory<IN>())));
+                               new ValueStateDescriptor<>(
+                                       PRIORITY_QUEUE_STATE_NAME,
+                                       new PriorityQueueSerializer<>(
+                                               streamRecordSerializer,
+                                               new 
PriorityQueueStreamRecordFactory<IN>()
+                                       )
+                               )
+                       );
                }
        }
 
-       @Override
-       protected NFA<IN> getNFA() throws IOException {
+       private NFA<IN> getNFA() throws IOException {
                NFA<IN> nfa = nfaOperatorState.value();
 
                if (nfa == null) {
@@ -132,13 +151,11 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
                return nfa;
        }
 
-       @Override
-       protected void updateNFA(NFA<IN> nfa) throws IOException {
+       private void updateNFA(NFA<IN> nfa) throws IOException {
                nfaOperatorState.update(nfa);
        }
 
-       @Override
-       protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws 
IOException {
+       private PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws 
IOException {
                PriorityQueue<StreamRecord<IN>> priorityQueue = 
priorityQueueOperatorState.value();
 
                if (priorityQueue == null) {
@@ -150,8 +167,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
                return priorityQueue;
        }
 
-       @Override
-       protected void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> 
queue) throws IOException {
+       private void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) 
throws IOException {
                priorityQueueOperatorState.update(queue);
        }
 
@@ -159,7 +175,24 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
        public void processElement(StreamRecord<IN> element) throws Exception {
                keys.add(keySelector.getKey(element.getValue()));
 
-               super.processElement(element);
+               if (isProcessingTime) {
+                       // there can be no out of order elements in processing 
time
+                       NFA<IN> nfa = getNFA();
+                       processEvent(nfa, element.getValue(), 
System.currentTimeMillis());
+                       updateNFA(nfa);
+               } else {
+                       PriorityQueue<StreamRecord<IN>> priorityQueue = 
getPriorityQueue();
+
+                       // event time processing
+                       // we have to buffer the elements until we receive the 
proper watermark
+                       if (getExecutionConfig().isObjectReuseEnabled()) {
+                               // copy the StreamRecord so that it cannot be 
changed
+                               priorityQueue.offer(new 
StreamRecord<IN>(inputSerializer.copy(element.getValue()), 
element.getTimestamp()));
+                       } else {
+                               priorityQueue.offer(element);
+                       }
+                       updatePriorityQueue(priorityQueue);
+               }
        }
 
        @Override
@@ -175,7 +208,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
                        NFA<IN> nfa = getNFA();
 
                        if (priorityQueue.isEmpty()) {
-                                       advanceTime(nfa, mark.getTimestamp());
+                               advanceTime(nfa, mark.getTimestamp());
                        } else {
                                while (!priorityQueue.isEmpty() && 
priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
                                        StreamRecord<IN> streamRecord = 
priorityQueue.poll();
@@ -191,6 +224,25 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
                output.emitWatermark(mark);
        }
 
+       /**
+        * Process the given event by giving it to the NFA and outputting the 
produced set of matched
+        * event sequences.
+        *
+        * @param nfa NFA to be used for the event detection
+        * @param event The current event to be processed
+        * @param timestamp The timestamp of the event
+        */
+       protected abstract void processEvent(NFA<IN> nfa, IN event, long 
timestamp);
+
+       /**
+        * Advances the time for the given NFA to the given timestamp. This can 
lead to pruning and
+        * timeouts.
+        *
+        * @param nfa to advance the time for
+        * @param timestamp to advance the time to
+        */
+       protected abstract void advanceTime(NFA<IN> nfa, long timestamp);
+
        @Override
        public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
                DataOutputView ov = new DataOutputViewStreamWrapper(out);
@@ -216,6 +268,8 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
                }
        }
 
+       //////////////////////                  Utility Classes                 
//////////////////////
+
        /**
         * Custom type serializer implementation to serialize priority queues.
         *
@@ -228,7 +282,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
                private final TypeSerializer<T> elementSerializer;
                private final PriorityQueueFactory<T> factory;
 
-               public PriorityQueueSerializer(final TypeSerializer<T> 
elementSerializer, final PriorityQueueFactory<T> factory) {
+               PriorityQueueSerializer(final TypeSerializer<T> 
elementSerializer, final PriorityQueueFactory<T> factory) {
                        this.elementSerializer = elementSerializer;
                        this.factory = factory;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index 39e2ccd..36f2e7a 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -21,7 +21,9 @@ package org.apache.flink.cep.operator;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.NullByteKeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -72,12 +74,18 @@ public class CEPOperatorUtils {
                                        keySerializer,
                                        nfaFactory));
                } else {
-                       patternStream = inputStream.transform(
+
+                       KeySelector<T, Byte> keySelector = new 
NullByteKeySelector<>();
+                       TypeSerializer<Byte> keySerializer = 
ByteSerializer.INSTANCE;
+
+                       patternStream = 
inputStream.keyBy(keySelector).transform(
                                "CEPPatternOperator",
                                (TypeInformation<Map<String, T>>) 
(TypeInformation<?>) TypeExtractor.getForClass(Map.class),
-                               new CEPPatternOperator<>(
+                               new KeyedCEPPatternOperator<>(
                                        inputSerializer,
                                        isProcessingTime,
+                                       keySelector,
+                                       keySerializer,
                                        nfaFactory
                                )).forceNonParallel();
                }
@@ -127,12 +135,19 @@ public class CEPOperatorUtils {
                                        keySerializer,
                                        nfaFactory));
                } else {
-                       patternStream = inputStream.transform(
+
+                       KeySelector<T, Byte> keySelector = new 
NullByteKeySelector<>();
+
+                       TypeSerializer<Byte> keySerializer = 
ByteSerializer.INSTANCE;
+
+                       patternStream = 
inputStream.keyBy(keySelector).transform(
                                "TimeoutCEPPatternOperator",
                                eitherTypeInformation,
-                               new TimeoutCEPPatternOperator<>(
+                               new TimeoutKeyedCEPPatternOperator<>(
                                        inputSerializer,
                                        isProcessingTime,
+                                       keySelector,
+                                       keySerializer,
                                        nfaFactory
                                )).forceNonParallel();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
deleted file mode 100644
index 57f27c2..0000000
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * CEP pattern operator which only returns fully matched event patterns stored 
in a {@link Map}. The
- * events are indexed by the event names associated in the pattern 
specification.
- *
- * @param <IN> Type of the input events
- */
-public class CEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN, 
Map<String, IN>> {
-       private static final long serialVersionUID = 376300194236250645L;
-
-       public CEPPatternOperator(TypeSerializer<IN> inputSerializer, boolean 
isProcessingTime, NFACompiler.NFAFactory<IN> nfaFactory) {
-               super(inputSerializer, isProcessingTime, nfaFactory);
-       }
-
-       @Override
-       protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
-               Tuple2<Collection<Map<String, IN>>, 
Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(
-                       event,
-                       timestamp);
-
-               Collection<Map<String, IN>> matchedPatterns = patterns.f0;
-
-               emitMatchedSequences(matchedPatterns, timestamp);
-       }
-
-       @Override
-       protected void advanceTime(NFA<IN> nfa, long timestamp) {
-               Tuple2<Collection<Map<String, IN>>, 
Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, 
timestamp);
-
-               emitMatchedSequences(patterns.f0, timestamp);
-       }
-
-       private void emitMatchedSequences(Iterable<Map<String, IN>> 
matchedSequences, long timestamp) {
-               Iterator<Map<String, IN>> iterator = 
matchedSequences.iterator();
-
-               if (iterator.hasNext()) {
-                       StreamRecord<Map<String, IN>> streamRecord = new 
StreamRecord<Map<String, IN>>(
-                               null,
-                               timestamp);
-
-                       do {
-                               streamRecord.replace(iterator.next());
-                               output.collect(streamRecord);
-                       } while (iterator.hasNext());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 4d8a907..5b6ffe2 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -40,25 +40,27 @@ import java.util.Map;
 public class KeyedCEPPatternOperator<IN, KEY> extends 
AbstractKeyedCEPPatternOperator<IN, KEY, Map<String, IN>> {
        private static final long serialVersionUID = 5328573789532074581L;
 
-       public KeyedCEPPatternOperator(TypeSerializer<IN> inputSerializer, 
boolean isProcessingTime, KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> 
keySerializer, NFACompiler.NFAFactory<IN> nfaFactory) {
+       public KeyedCEPPatternOperator(
+                       TypeSerializer<IN> inputSerializer,
+                       boolean isProcessingTime,
+                       KeySelector<IN, KEY> keySelector,
+                       TypeSerializer<KEY> keySerializer,
+                       NFACompiler.NFAFactory<IN> nfaFactory) {
+
                super(inputSerializer, isProcessingTime, keySelector, 
keySerializer, nfaFactory);
        }
 
        @Override
        protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
-               Tuple2<Collection<Map<String, IN>>, 
Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(
-                       event,
-                       timestamp);
-
-               Collection<Map<String, IN>> matchedPatterns = patterns.f0;
-
-               emitMatchedSequences(matchedPatterns, timestamp);
+               Tuple2<Collection<Map<String, IN>>, 
Collection<Tuple2<Map<String, IN>, Long>>> patterns =
+                       nfa.process(event, timestamp);
+               emitMatchedSequences(patterns.f0, timestamp);
        }
 
        @Override
        protected void advanceTime(NFA<IN> nfa, long timestamp) {
-               Tuple2<Collection<Map<String, IN>>, 
Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, 
timestamp);
-
+               Tuple2<Collection<Map<String, IN>>, 
Collection<Tuple2<Map<String, IN>, Long>>> patterns =
+                       nfa.process(null, timestamp);
                emitMatchedSequences(patterns.f0, timestamp);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
deleted file mode 100644
index 9a04468..0000000
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.types.Either;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * CEP pattern operator which only returns fully matched event patterns and 
partially matched event
- * patterns which have timed out wrapped in {@link Either}. The matched events 
are stored in a
- * {@link Map} and are indexed by the event names associated in the pattern 
specification.
- *
- * The fully matched event patterns are returned as a {@link Either.Right} 
instance and the
- * partially matched event patterns are returned as a {@link Either.Left} 
instance.
- *
- * @param <IN> Type of the input events
- */
-public class TimeoutCEPPatternOperator<IN> extends 
AbstractCEPPatternOperator<IN, Either<Tuple2<Map<String, IN>, Long>, 
Map<String, IN>>> {
-       private static final long serialVersionUID = -3911002597290988201L;
-
-       public TimeoutCEPPatternOperator(
-               TypeSerializer<IN> inputSerializer,
-               boolean isProcessingTime,
-               NFACompiler.NFAFactory<IN> nfaFactory) {
-
-               super(inputSerializer, isProcessingTime, nfaFactory);
-       }
-
-       @Override
-       protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
-               Tuple2<Collection<Map<String, IN>>, 
Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(
-                       event,
-                       timestamp);
-
-               Collection<Map<String, IN>> matchedPatterns = patterns.f0;
-               Collection<Tuple2<Map<String, IN>, Long>> partialPatterns = 
patterns.f1;
-
-               emitMatchedSequences(matchedPatterns, timestamp);
-               emitTimedOutSequences(partialPatterns, timestamp);
-       }
-
-       @Override
-       protected void advanceTime(NFA<IN> nfa, long timestamp) {
-               Tuple2<Collection<Map<String, IN>>, 
Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, 
timestamp);
-
-               emitMatchedSequences(patterns.f0, timestamp);
-               emitTimedOutSequences(patterns.f1, timestamp);
-       }
-
-       private void emitTimedOutSequences(Iterable<Tuple2<Map<String, IN>, 
Long>> timedOutSequences, long timestamp) {
-               StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, 
IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, 
Map<String, IN>>>(
-                       null,
-                       timestamp);
-
-               for (Tuple2<Map<String, IN>, Long> partialPattern: 
timedOutSequences) {
-                       streamRecord.replace(Either.Left(partialPattern));
-                       output.collect(streamRecord);
-               }
-       }
-
-       protected void emitMatchedSequences(Iterable<Map<String, IN>> 
matchedSequences, long timestamp) {
-               StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, 
IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, 
Map<String, IN>>>(
-                       null,
-                       timestamp);
-
-               for (Map<String, IN> matchedPattern : matchedSequences) {
-                       streamRecord.replace(Either.Right(matchedPattern));
-                       output.collect(streamRecord);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index 4d33435..6889bb9 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -29,44 +29,48 @@ import org.apache.flink.types.Either;
 import java.util.Collection;
 import java.util.Map;
 
+/**
+ * CEP pattern operator which returns fully and partially matched (timed-out) 
event patterns stored in a
+ * {@link Map}. The events are indexed by the event names associated in the 
pattern specification. The
+ * operator works on keyed input data.
+ *
+ * @param <IN> Type of the input events
+ * @param <KEY> Type of the key
+ */
 public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends 
AbstractKeyedCEPPatternOperator<IN, KEY, Either<Tuple2<Map<String, IN>, Long>, 
Map<String, IN>>> {
        private static final long serialVersionUID = 3570542177814518158L;
 
        public TimeoutKeyedCEPPatternOperator(
-               TypeSerializer<IN> inputSerializer,
-               boolean isProcessingTime,
-               KeySelector<IN, KEY> keySelector,
-               TypeSerializer<KEY> keySerializer,
-               NFACompiler.NFAFactory<IN> nfaFactory) {
+                       TypeSerializer<IN> inputSerializer,
+                       boolean isProcessingTime,
+                       KeySelector<IN, KEY> keySelector,
+                       TypeSerializer<KEY> keySerializer,
+                       NFACompiler.NFAFactory<IN> nfaFactory) {
 
                super(inputSerializer, isProcessingTime, keySelector, 
keySerializer, nfaFactory);
        }
 
        @Override
        protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
-               Tuple2<Collection<Map<String, IN>>, 
Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(
-                       event,
-                       timestamp);
-
-               Collection<Map<String, IN>> matchedSequences = patterns.f0;
-               Collection<Tuple2<Map<String, IN>, Long>> timedOutSequences = 
patterns.f1;
+               Tuple2<Collection<Map<String, IN>>, 
Collection<Tuple2<Map<String, IN>, Long>>> patterns =
+                       nfa.process(event, timestamp);
 
-               emitMatchedSequences(matchedSequences, timestamp);
-               emitTimedOutSequences(timedOutSequences, timestamp);
+               emitMatchedSequences(patterns.f0, timestamp);
+               emitTimedOutSequences(patterns.f1, timestamp);
        }
 
        @Override
        protected void advanceTime(NFA<IN> nfa, long timestamp) {
-               Tuple2<Collection<Map<String, IN>>, 
Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, 
timestamp);
+               Tuple2<Collection<Map<String, IN>>, 
Collection<Tuple2<Map<String, IN>, Long>>> patterns =
+                       nfa.process(null, timestamp);
 
                emitMatchedSequences(patterns.f0, timestamp);
                emitTimedOutSequences(patterns.f1, timestamp);
        }
 
        private void emitTimedOutSequences(Iterable<Tuple2<Map<String, IN>, 
Long>> timedOutSequences, long timestamp) {
-               StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, 
IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, 
Map<String, IN>>>(
-                       null,
-                       timestamp);
+               StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, 
IN>>> streamRecord =
+                       new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, 
Map<String, IN>>>(null, timestamp);
 
                for (Tuple2<Map<String, IN>, Long> partialPattern: 
timedOutSequences) {
                        streamRecord.replace(Either.Left(partialPattern));
@@ -75,9 +79,8 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends 
AbstractKeyedCEPPat
        }
 
        protected void emitMatchedSequences(Iterable<Map<String, IN>> 
matchedSequences, long timestamp) {
-               StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, 
IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, 
Map<String, IN>>>(
-                       null,
-                       timestamp);
+               StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, 
IN>>> streamRecord =
+                       new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, 
Map<String, IN>>>(null, timestamp);
 
                for (Map<String, IN> matchedPattern : matchedSequences) {
                        streamRecord.replace(Either.Right(matchedPattern));

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index db17f6d..f90b670 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -57,29 +57,6 @@ public class CEPOperatorTest extends TestLogger {
        public TemporaryFolder tempFolder = new TemporaryFolder();
 
        @Test
-       public void testCEPOperatorWatermarkForwarding() throws Exception {
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = new OneInputStreamOperatorTestHarness<>(
-                       new CEPPatternOperator<>(
-                               Event.createTypeSerializer(),
-                               false,
-                               new NFAFactory())
-               );
-
-               harness.open();
-
-               Watermark expectedWatermark = new Watermark(42L);
-
-               harness.processWatermark(expectedWatermark);
-
-               Object watermark = harness.getOutput().poll();
-
-               assertTrue(watermark instanceof Watermark);
-               assertEquals(expectedWatermark, watermark);
-
-               harness.close();
-       }
-
-       @Test
        public void testKeyedCEPOperatorWatermarkForwarding() throws Exception {
                KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
                        private static final long serialVersionUID = 
-4873366487571254798L;
@@ -115,94 +92,6 @@ public class CEPOperatorTest extends TestLogger {
        }
 
        @Test
-       public void testCEPOperatorCheckpointing() throws Exception {
-               KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
-                       private static final long serialVersionUID = 
-4873366487571254798L;
-
-                       @Override
-                       public Integer getKey(Event value) throws Exception {
-                               return value.getId();
-                       }
-               };
-
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = new OneInputStreamOperatorTestHarness<>(
-                               new CEPPatternOperator<>(
-                                               Event.createTypeSerializer(),
-                                               false,
-                                               new NFAFactory()));
-
-               harness.open();
-
-               Event startEvent = new Event(42, "start", 1.0);
-               SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
-               Event endEvent=  new Event(42, "end", 1.0);
-
-               harness.processElement(new StreamRecord<Event>(startEvent, 1));
-               harness.processElement(new StreamRecord<Event>(new Event(42, 
"foobar", 1.0), 2));
-
-               // simulate snapshot/restore with some elements in internal 
sorting queue
-               StreamStateHandle snapshot = harness.snapshotLegacy(0, 0);
-               harness.close();
-
-               harness = new OneInputStreamOperatorTestHarness<>(
-                               new CEPPatternOperator<>(
-                                               Event.createTypeSerializer(),
-                                               false,
-                                               new NFAFactory()));
-
-               harness.setup();
-               harness.restore(snapshot);
-               harness.open();
-
-               harness.processWatermark(new Watermark(Long.MIN_VALUE));
-
-               harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3));
-
-               // if element timestamps are not correctly 
checkpointed/restored this will lead to
-               // a pruning time underflow exception in NFA
-               harness.processWatermark(new Watermark(2));
-
-               // simulate snapshot/restore with empty element queue but NFA 
state
-               StreamStateHandle snapshot2 = harness.snapshotLegacy(1, 1);
-               harness.close();
-
-               harness = new OneInputStreamOperatorTestHarness<>(
-                               new CEPPatternOperator<>(
-                                               Event.createTypeSerializer(),
-                                               false,
-                                               new NFAFactory()));
-
-               harness.setup();
-               harness.restore(snapshot2);
-               harness.open();
-
-               harness.processElement(new StreamRecord<Event>(middleEvent, 3));
-               harness.processElement(new StreamRecord<Event>(new Event(42, 
"start", 1.0), 4));
-               harness.processElement(new StreamRecord<Event>(endEvent, 5));
-
-               harness.processWatermark(new Watermark(Long.MAX_VALUE));
-
-               ConcurrentLinkedQueue<Object> result = harness.getOutput();
-
-               // watermark and the result
-               assertEquals(2, result.size());
-
-               Object resultObject = result.poll();
-               assertTrue(resultObject instanceof StreamRecord);
-               StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
-               assertTrue(resultRecord.getValue() instanceof Map);
-
-               @SuppressWarnings("unchecked")
-               Map<String, Event> patternMap = (Map<String, Event>) 
resultRecord.getValue();
-
-               assertEquals(startEvent, patternMap.get("start"));
-               assertEquals(middleEvent, patternMap.get("middle"));
-               assertEquals(endEvent, patternMap.get("end"));
-
-               harness.close();
-       }
-
-       @Test
        public void testKeyedCEPOperatorCheckpointing() throws Exception {
 
                KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
@@ -493,71 +382,6 @@ public class CEPOperatorTest extends TestLogger {
                }
        }
 
-       /**
-        * Tests that the internal time of a CEP operator advances only given 
watermarks. See FLINK-5033
-        */
-       @Test
-       public void testAdvancingTimeWithoutElements() throws Exception {
-               final Event startEvent = new Event(42, "start", 1.0);
-               final long watermarkTimestamp1 = 5L;
-               final long watermarkTimestamp2 = 13L;
-
-               final Map<String, Event> expectedSequence = new HashMap<>(2);
-               expectedSequence.put("start", startEvent);
-
-               OneInputStreamOperatorTestHarness<Event, 
Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> harness = new 
OneInputStreamOperatorTestHarness<>(
-                       new TimeoutCEPPatternOperator<>(
-                               Event.createTypeSerializer(),
-                               false,
-                               new NFAFactory(true))
-               );
-
-               try {
-                       harness.setup(
-                               new KryoSerializer<>(
-                                       (Class<Either<Tuple2<Map<String, 
Event>, Long>, Map<String, Event>>>) (Object) Either.class,
-                                       new ExecutionConfig()));
-                       harness.open();
-
-                       harness.processElement(new StreamRecord<>(startEvent, 
3L));
-                       harness.processWatermark(new 
Watermark(watermarkTimestamp1));
-                       harness.processWatermark(new 
Watermark(watermarkTimestamp2));
-
-                       Queue<Object> result = harness.getOutput();
-
-                       assertEquals(3, result.size());
-
-                       Object watermark1 = result.poll();
-
-                       assertTrue(watermark1 instanceof Watermark);
-
-                       assertEquals(watermarkTimestamp1, ((Watermark) 
watermark1).getTimestamp());
-
-                       Object resultObject = result.poll();
-
-                       assertTrue(resultObject instanceof StreamRecord);
-
-                       StreamRecord<Either<Tuple2<Map<String, Event>, Long>, 
Map<String, Event>>> streamRecord = 
(StreamRecord<Either<Tuple2<Map<String,Event>,Long>,Map<String,Event>>>) 
resultObject;
-
-                       assertTrue(streamRecord.getValue() instanceof 
Either.Left);
-
-                       Either.Left<Tuple2<Map<String, Event>, Long>, 
Map<String, Event>> left = (Either.Left<Tuple2<Map<String, Event>, Long>, 
Map<String, Event>>) streamRecord.getValue();
-
-                       Tuple2<Map<String, Event>, Long> leftResult = 
left.left();
-
-                       assertEquals(watermarkTimestamp2, (long) leftResult.f1);
-                       assertEquals(expectedSequence, leftResult.f0);
-
-                       Object watermark2 = result.poll();
-
-                       assertTrue(watermark2 instanceof Watermark);
-
-                       assertEquals(watermarkTimestamp2, ((Watermark) 
watermark2).getTimestamp());
-               } finally {
-                       harness.close();
-               }
-       }
-
        private static class NFAFactory implements 
NFACompiler.NFAFactory<Event> {
 
                private static final long serialVersionUID = 
1173020762472766713L;

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 4f4546e..f883ef5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.NullByteKeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
@@ -1124,17 +1125,4 @@ public class AllWindowedStream<T, W extends Window> {
        public TypeInformation<T> getInputType() {
                return input.getType();
        }
-
-       /**
-        * Used as dummy KeySelector to allow using WindowOperator for 
Non-Keyed Windows.
-        * @param <T>
-        */
-       private static class NullByteKeySelector<T> implements KeySelector<T, 
Byte> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Byte getKey(T value) throws Exception {
-                       return 0;
-               }
-       }
 }

Reply via email to