Repository: flink Updated Branches: refs/heads/master dd8ef550c -> 15ae922ad
[FLINK-5845] [cep] Unify keyed and non-keyed operators. Now all cep operators are keyed, and for the non-keyed usecases, we key on a dummy key and use the keyed operator. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/15ae922a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/15ae922a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/15ae922a Branch: refs/heads/master Commit: 15ae922ad4151701cbb4e0df207f43d0094366d1 Parents: dd8ef55 Author: kl0u <[email protected]> Authored: Thu Feb 16 12:02:25 2017 +0100 Committer: kl0u <[email protected]> Committed: Fri Feb 24 11:10:40 2017 +0100 ---------------------------------------------------------------------- .../flink/api/java/functions/KeySelector.java | 2 +- .../api/java/functions/NullByteKeySelector.java | 39 ++++ .../AbstractCEPBasePatternOperator.java | 108 ------------ .../operator/AbstractCEPPatternOperator.java | 142 --------------- .../AbstractKeyedCEPPatternOperator.java | 128 ++++++++++---- .../flink/cep/operator/CEPOperatorUtils.java | 23 ++- .../flink/cep/operator/CEPPatternOperator.java | 76 -------- .../cep/operator/KeyedCEPPatternOperator.java | 22 +-- .../cep/operator/TimeoutCEPPatternOperator.java | 94 ---------- .../TimeoutKeyedCEPPatternOperator.java | 43 ++--- .../flink/cep/operator/CEPOperatorTest.java | 176 ------------------- .../api/datastream/AllWindowedStream.java | 14 +- 12 files changed, 186 insertions(+), 681 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java b/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java index d96f078..da3b3e2 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java @@ -34,7 +34,7 @@ import java.io.Serializable; */ @Public public interface KeySelector<IN, KEY> extends Function, Serializable { - + /** * User-defined function that extracts the key from an arbitrary object. * http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-core/src/main/java/org/apache/flink/api/java/functions/NullByteKeySelector.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/functions/NullByteKeySelector.java b/flink-core/src/main/java/org/apache/flink/api/java/functions/NullByteKeySelector.java new file mode 100644 index 0000000..4aa533d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/functions/NullByteKeySelector.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.functions; + +import org.apache.flink.annotation.Internal; + +/** + * Used as a dummy {@link KeySelector} to allow using keyed operators + * for non-keyed usecases. Essentially, it gives all incoming records + * the same key, which is a {@code (byte) 0} value. + * + * @param <T> The type of the input element. + */ +@Internal +public class NullByteKeySelector<T> implements KeySelector<T, Byte> { + + private static final long serialVersionUID = 614256539098549020L; + + @Override + public Byte getKey(T value) throws Exception { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java deleted file mode 100644 index a3497a6..0000000 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cep.operator; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.cep.nfa.NFA; -import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - -import java.io.IOException; -import java.util.PriorityQueue; - -/** - * Base class for CEP pattern operator. The operator uses a {@link NFA} to detect complex event - * patterns. The detected event patterns are then outputted to the down stream operators. - * - * @param <IN> Type of the input elements - * @param <OUT> Type fo the output elements - */ -public abstract class AbstractCEPBasePatternOperator<IN, OUT> - extends AbstractStreamOperator<OUT> - implements OneInputStreamOperator<IN, OUT>, StreamCheckpointedOperator { - - private static final long serialVersionUID = -4166778210774160757L; - - protected static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11; - - private final TypeSerializer<IN> inputSerializer; - private final boolean isProcessingTime; - - public AbstractCEPBasePatternOperator( - final TypeSerializer<IN> inputSerializer, - final boolean isProcessingTime) { - this.inputSerializer = inputSerializer; - this.isProcessingTime = isProcessingTime; - } - - public TypeSerializer<IN> getInputSerializer() { - return inputSerializer; - } - - protected abstract NFA<IN> getNFA() throws IOException; - - protected abstract void updateNFA(NFA<IN> nfa) throws IOException; - - protected abstract PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException; - - protected abstract void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException; - - @Override - public void processElement(StreamRecord<IN> element) throws Exception { - if (isProcessingTime) { - // there can be no out of order elements in processing time - NFA<IN> nfa = getNFA(); - processEvent(nfa, element.getValue(), System.currentTimeMillis()); - updateNFA(nfa); - } else { - PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); - - // event time processing - // we have to buffer the elements until we receive the proper watermark - if (getExecutionConfig().isObjectReuseEnabled()) { - // copy the StreamRecord so that it cannot be changed - priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp())); - } else { - priorityQueue.offer(element); - } - updatePriorityQueue(priorityQueue); - } - } - - /** - * Process the given event by giving it to the NFA and outputting the produced set of matched - * event sequences. - * - * @param nfa NFA to be used for the event detection - * @param event The current event to be processed - * @param timestamp The timestamp of the event - */ - protected abstract void processEvent(NFA<IN> nfa, IN event, long timestamp); - - /** - * Advances the time for the given NFA to the given timestamp. This can lead to pruning and - * timeouts. - * - * @param nfa to advance the time for - * @param timestamp to advance the time to - */ - protected abstract void advanceTime(NFA<IN> nfa, long timestamp); -} http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java deleted file mode 100644 index fe9aced..0000000 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cep.operator; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.cep.nfa.NFA; -import org.apache.flink.cep.nfa.compiler.NFACompiler; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; -import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.PriorityQueue; - -/** - * Abstract CEP pattern operator which is used for non keyed streams. Consequently, - * the operator state only includes a single {@link NFA} and a priority queue to order out of order - * elements in case of event time processing. - * - * @param <IN> Type of the input elements - * @param <OUT> Type of the output elements - */ -abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBasePatternOperator<IN, OUT> { - private static final long serialVersionUID = 7487334510746595640L; - - private final StreamElementSerializer<IN> streamRecordSerializer; - - // global nfa for all elements - private NFA<IN> nfa; - - // queue to buffer out of order stream records - private transient PriorityQueue<StreamRecord<IN>> priorityQueue; - - public AbstractCEPPatternOperator( - TypeSerializer<IN> inputSerializer, - boolean isProcessingTime, - NFACompiler.NFAFactory<IN> nfaFactory) { - super(inputSerializer, isProcessingTime); - - this.streamRecordSerializer = new StreamElementSerializer<>(inputSerializer); - this.nfa = nfaFactory.createNFA(); - } - - @Override - public void open() throws Exception { - super.open(); - if (priorityQueue == null) { - priorityQueue = new PriorityQueue<StreamRecord<IN>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<IN>()); - } - } - - @Override - protected NFA<IN> getNFA() throws IOException { - return nfa; - } - - @Override - protected void updateNFA(NFA<IN> nfa) { - // a no-op, because we only have one NFA - } - - @Override - protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException { - return priorityQueue; - } - - @Override - protected void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) { - // a no-op, because we only have one priority queue - } - - @Override - public void processWatermark(Watermark mark) throws Exception { - // we do our own watermark handling, no super call. we will never be able to use - // the timer service like this, however. - - if (priorityQueue.isEmpty()) { - advanceTime(nfa, mark.getTimestamp()); - } else { - while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) { - StreamRecord<IN> streamRecord = priorityQueue.poll(); - - processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp()); - } - } - - output.emitWatermark(mark); - } - - @Override - public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { - final ObjectOutputStream oos = new ObjectOutputStream(out); - - oos.writeObject(nfa); - oos.writeInt(priorityQueue.size()); - - for (StreamRecord<IN> streamRecord: priorityQueue) { - streamRecordSerializer.serialize(streamRecord, new DataOutputViewStreamWrapper(oos)); - } - oos.flush(); - } - - @Override - @SuppressWarnings("unchecked") - public void restoreState(FSDataInputStream state) throws Exception { - final ObjectInputStream ois = new ObjectInputStream(state); - - nfa = (NFA<IN>)ois.readObject(); - - int numberPriorityQueueEntries = ois.readInt(); - - priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>()); - - for (int i = 0; i <numberPriorityQueueEntries; i++) { - StreamElement streamElement = streamRecordSerializer.deserialize(new DataInputViewStreamWrapper(ois)); - priorityQueue.offer(streamElement.<IN>asRecord()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 832a0ba..90ee846 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -30,9 +30,13 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; import java.io.IOException; import java.io.Serializable; @@ -44,7 +48,7 @@ import java.util.Set; /** * Abstract CEP pattern operator for a keyed input stream. For each key, the operator creates * a {@link NFA} and a priority queue to buffer out of order elements. Both data structures are - * stored using the key value state. Additionally, the set of all seen keys is kept as part of the + * stored using the managed keyed state. Additionally, the set of all seen keys is kept as part of the * operator state. This is necessary to trigger the execution for all keys upon receiving a new * watermark. * @@ -52,11 +56,17 @@ import java.util.Set; * @param <KEY> Type of the key on which the input stream is keyed * @param <OUT> Type of the output elements */ -abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends AbstractCEPBasePatternOperator<IN, OUT> { - private static final long serialVersionUID = -7234999752950159178L; +public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> + extends AbstractStreamOperator<OUT> + implements OneInputStreamOperator<IN, OUT>, StreamCheckpointedOperator { - private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorState"; - private static final String PRIORIRY_QUEUE_STATE_NAME = "priorityQueueStateName"; + private static final long serialVersionUID = -4166778210774160757L; + + private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11; + + private final boolean isProcessingTime; + + private final TypeSerializer<IN> inputSerializer; // necessary to extract the key from the input elements private final KeySelector<IN, KEY> keySelector; @@ -64,29 +74,38 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst // necessary to serialize the set of seen keys private final TypeSerializer<KEY> keySerializer; - private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>(); - private final NFACompiler.NFAFactory<IN> nfaFactory; + /////////////// State ////////////// // stores the keys we've already seen to trigger execution upon receiving a watermark // this can be problematic, since it is never cleared // TODO: fix once the state refactoring is completed private transient Set<KEY> keys; + private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorState"; + private static final String PRIORITY_QUEUE_STATE_NAME = "priorityQueueStateName"; + private transient ValueState<NFA<IN>> nfaOperatorState; private transient ValueState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState; - public AbstractKeyedCEPPatternOperator( - TypeSerializer<IN> inputSerializer, - boolean isProcessingTime, - KeySelector<IN, KEY> keySelector, - TypeSerializer<KEY> keySerializer, - NFACompiler.NFAFactory<IN> nfaFactory) { - super(inputSerializer, isProcessingTime); + private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>(); + private final NFACompiler.NFAFactory<IN> nfaFactory; - this.keySelector = keySelector; - this.keySerializer = keySerializer; + public AbstractKeyedCEPPatternOperator( + final TypeSerializer<IN> inputSerializer, + final boolean isProcessingTime, + final KeySelector<IN, KEY> keySelector, + final TypeSerializer<KEY> keySerializer, + final NFACompiler.NFAFactory<IN> nfaFactory) { + + this.inputSerializer = Preconditions.checkNotNull(inputSerializer); + this.isProcessingTime = Preconditions.checkNotNull(isProcessingTime); + this.keySelector = Preconditions.checkNotNull(keySelector); + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.nfaFactory = Preconditions.checkNotNull(nfaFactory); + } - this.nfaFactory = nfaFactory; + public TypeSerializer<IN> getInputSerializer() { + return inputSerializer; } @Override @@ -100,27 +119,27 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst if (nfaOperatorState == null) { nfaOperatorState = getPartitionedState( - new ValueStateDescriptor<NFA<IN>>( - NFA_OPERATOR_STATE_NAME, - new NFA.Serializer<IN>())); + new ValueStateDescriptor<>(NFA_OPERATOR_STATE_NAME, new NFA.Serializer<IN>())); } @SuppressWarnings("unchecked,rawtypes") TypeSerializer<StreamRecord<IN>> streamRecordSerializer = - (TypeSerializer) new StreamElementSerializer<>(getInputSerializer()); + (TypeSerializer) new StreamElementSerializer<>(getInputSerializer()); if (priorityQueueOperatorState == null) { priorityQueueOperatorState = getPartitionedState( - new ValueStateDescriptor<>( - PRIORIRY_QUEUE_STATE_NAME, - new PriorityQueueSerializer<>( - streamRecordSerializer, - new PriorityQueueStreamRecordFactory<IN>()))); + new ValueStateDescriptor<>( + PRIORITY_QUEUE_STATE_NAME, + new PriorityQueueSerializer<>( + streamRecordSerializer, + new PriorityQueueStreamRecordFactory<IN>() + ) + ) + ); } } - @Override - protected NFA<IN> getNFA() throws IOException { + private NFA<IN> getNFA() throws IOException { NFA<IN> nfa = nfaOperatorState.value(); if (nfa == null) { @@ -132,13 +151,11 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst return nfa; } - @Override - protected void updateNFA(NFA<IN> nfa) throws IOException { + private void updateNFA(NFA<IN> nfa) throws IOException { nfaOperatorState.update(nfa); } - @Override - protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException { + private PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException { PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueOperatorState.value(); if (priorityQueue == null) { @@ -150,8 +167,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst return priorityQueue; } - @Override - protected void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException { + private void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException { priorityQueueOperatorState.update(queue); } @@ -159,7 +175,24 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst public void processElement(StreamRecord<IN> element) throws Exception { keys.add(keySelector.getKey(element.getValue())); - super.processElement(element); + if (isProcessingTime) { + // there can be no out of order elements in processing time + NFA<IN> nfa = getNFA(); + processEvent(nfa, element.getValue(), System.currentTimeMillis()); + updateNFA(nfa); + } else { + PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); + + // event time processing + // we have to buffer the elements until we receive the proper watermark + if (getExecutionConfig().isObjectReuseEnabled()) { + // copy the StreamRecord so that it cannot be changed + priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp())); + } else { + priorityQueue.offer(element); + } + updatePriorityQueue(priorityQueue); + } } @Override @@ -175,7 +208,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst NFA<IN> nfa = getNFA(); if (priorityQueue.isEmpty()) { - advanceTime(nfa, mark.getTimestamp()); + advanceTime(nfa, mark.getTimestamp()); } else { while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) { StreamRecord<IN> streamRecord = priorityQueue.poll(); @@ -191,6 +224,25 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst output.emitWatermark(mark); } + /** + * Process the given event by giving it to the NFA and outputting the produced set of matched + * event sequences. + * + * @param nfa NFA to be used for the event detection + * @param event The current event to be processed + * @param timestamp The timestamp of the event + */ + protected abstract void processEvent(NFA<IN> nfa, IN event, long timestamp); + + /** + * Advances the time for the given NFA to the given timestamp. This can lead to pruning and + * timeouts. + * + * @param nfa to advance the time for + * @param timestamp to advance the time to + */ + protected abstract void advanceTime(NFA<IN> nfa, long timestamp); + @Override public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { DataOutputView ov = new DataOutputViewStreamWrapper(out); @@ -216,6 +268,8 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst } } + ////////////////////// Utility Classes ////////////////////// + /** * Custom type serializer implementation to serialize priority queues. * @@ -228,7 +282,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst private final TypeSerializer<T> elementSerializer; private final PriorityQueueFactory<T> factory; - public PriorityQueueSerializer(final TypeSerializer<T> elementSerializer, final PriorityQueueFactory<T> factory) { + PriorityQueueSerializer(final TypeSerializer<T> elementSerializer, final PriorityQueueFactory<T> factory) { this.elementSerializer = elementSerializer; this.factory = factory; } http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java index 39e2ccd..36f2e7a 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java @@ -21,7 +21,9 @@ package org.apache.flink.cep.operator; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ByteSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.functions.NullByteKeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.EitherTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -72,12 +74,18 @@ public class CEPOperatorUtils { keySerializer, nfaFactory)); } else { - patternStream = inputStream.transform( + + KeySelector<T, Byte> keySelector = new NullByteKeySelector<>(); + TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE; + + patternStream = inputStream.keyBy(keySelector).transform( "CEPPatternOperator", (TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class), - new CEPPatternOperator<>( + new KeyedCEPPatternOperator<>( inputSerializer, isProcessingTime, + keySelector, + keySerializer, nfaFactory )).forceNonParallel(); } @@ -127,12 +135,19 @@ public class CEPOperatorUtils { keySerializer, nfaFactory)); } else { - patternStream = inputStream.transform( + + KeySelector<T, Byte> keySelector = new NullByteKeySelector<>(); + + TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE; + + patternStream = inputStream.keyBy(keySelector).transform( "TimeoutCEPPatternOperator", eitherTypeInformation, - new TimeoutCEPPatternOperator<>( + new TimeoutKeyedCEPPatternOperator<>( inputSerializer, isProcessingTime, + keySelector, + keySerializer, nfaFactory )).forceNonParallel(); } http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java deleted file mode 100644 index 57f27c2..0000000 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cep.operator; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.cep.nfa.NFA; -import org.apache.flink.cep.nfa.compiler.NFACompiler; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; - -/** - * CEP pattern operator which only returns fully matched event patterns stored in a {@link Map}. The - * events are indexed by the event names associated in the pattern specification. - * - * @param <IN> Type of the input events - */ -public class CEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN, Map<String, IN>> { - private static final long serialVersionUID = 376300194236250645L; - - public CEPPatternOperator(TypeSerializer<IN> inputSerializer, boolean isProcessingTime, NFACompiler.NFAFactory<IN> nfaFactory) { - super(inputSerializer, isProcessingTime, nfaFactory); - } - - @Override - protected void processEvent(NFA<IN> nfa, IN event, long timestamp) { - Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process( - event, - timestamp); - - Collection<Map<String, IN>> matchedPatterns = patterns.f0; - - emitMatchedSequences(matchedPatterns, timestamp); - } - - @Override - protected void advanceTime(NFA<IN> nfa, long timestamp) { - Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp); - - emitMatchedSequences(patterns.f0, timestamp); - } - - private void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) { - Iterator<Map<String, IN>> iterator = matchedSequences.iterator(); - - if (iterator.hasNext()) { - StreamRecord<Map<String, IN>> streamRecord = new StreamRecord<Map<String, IN>>( - null, - timestamp); - - do { - streamRecord.replace(iterator.next()); - output.collect(streamRecord); - } while (iterator.hasNext()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java index 4d8a907..5b6ffe2 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java @@ -40,25 +40,27 @@ import java.util.Map; public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, Map<String, IN>> { private static final long serialVersionUID = 5328573789532074581L; - public KeyedCEPPatternOperator(TypeSerializer<IN> inputSerializer, boolean isProcessingTime, KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, NFACompiler.NFAFactory<IN> nfaFactory) { + public KeyedCEPPatternOperator( + TypeSerializer<IN> inputSerializer, + boolean isProcessingTime, + KeySelector<IN, KEY> keySelector, + TypeSerializer<KEY> keySerializer, + NFACompiler.NFAFactory<IN> nfaFactory) { + super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory); } @Override protected void processEvent(NFA<IN> nfa, IN event, long timestamp) { - Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process( - event, - timestamp); - - Collection<Map<String, IN>> matchedPatterns = patterns.f0; - - emitMatchedSequences(matchedPatterns, timestamp); + Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = + nfa.process(event, timestamp); + emitMatchedSequences(patterns.f0, timestamp); } @Override protected void advanceTime(NFA<IN> nfa, long timestamp) { - Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp); - + Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = + nfa.process(null, timestamp); emitMatchedSequences(patterns.f0, timestamp); } http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java deleted file mode 100644 index 9a04468..0000000 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cep.operator; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.cep.nfa.NFA; -import org.apache.flink.cep.nfa.compiler.NFACompiler; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.types.Either; - -import java.util.Collection; -import java.util.Map; - -/** - * CEP pattern operator which only returns fully matched event patterns and partially matched event - * patterns which have timed out wrapped in {@link Either}. The matched events are stored in a - * {@link Map} and are indexed by the event names associated in the pattern specification. - * - * The fully matched event patterns are returned as a {@link Either.Right} instance and the - * partially matched event patterns are returned as a {@link Either.Left} instance. - * - * @param <IN> Type of the input events - */ -public class TimeoutCEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN, Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> { - private static final long serialVersionUID = -3911002597290988201L; - - public TimeoutCEPPatternOperator( - TypeSerializer<IN> inputSerializer, - boolean isProcessingTime, - NFACompiler.NFAFactory<IN> nfaFactory) { - - super(inputSerializer, isProcessingTime, nfaFactory); - } - - @Override - protected void processEvent(NFA<IN> nfa, IN event, long timestamp) { - Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process( - event, - timestamp); - - Collection<Map<String, IN>> matchedPatterns = patterns.f0; - Collection<Tuple2<Map<String, IN>, Long>> partialPatterns = patterns.f1; - - emitMatchedSequences(matchedPatterns, timestamp); - emitTimedOutSequences(partialPatterns, timestamp); - } - - @Override - protected void advanceTime(NFA<IN> nfa, long timestamp) { - Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp); - - emitMatchedSequences(patterns.f0, timestamp); - emitTimedOutSequences(patterns.f1, timestamp); - } - - private void emitTimedOutSequences(Iterable<Tuple2<Map<String, IN>, Long>> timedOutSequences, long timestamp) { - StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>( - null, - timestamp); - - for (Tuple2<Map<String, IN>, Long> partialPattern: timedOutSequences) { - streamRecord.replace(Either.Left(partialPattern)); - output.collect(streamRecord); - } - } - - protected void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) { - StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>( - null, - timestamp); - - for (Map<String, IN> matchedPattern : matchedSequences) { - streamRecord.replace(Either.Right(matchedPattern)); - output.collect(streamRecord); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java index 4d33435..6889bb9 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java @@ -29,44 +29,48 @@ import org.apache.flink.types.Either; import java.util.Collection; import java.util.Map; +/** + * CEP pattern operator which returns fully and partially matched (timed-out) event patterns stored in a + * {@link Map}. The events are indexed by the event names associated in the pattern specification. The + * operator works on keyed input data. + * + * @param <IN> Type of the input events + * @param <KEY> Type of the key + */ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> { private static final long serialVersionUID = 3570542177814518158L; public TimeoutKeyedCEPPatternOperator( - TypeSerializer<IN> inputSerializer, - boolean isProcessingTime, - KeySelector<IN, KEY> keySelector, - TypeSerializer<KEY> keySerializer, - NFACompiler.NFAFactory<IN> nfaFactory) { + TypeSerializer<IN> inputSerializer, + boolean isProcessingTime, + KeySelector<IN, KEY> keySelector, + TypeSerializer<KEY> keySerializer, + NFACompiler.NFAFactory<IN> nfaFactory) { super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory); } @Override protected void processEvent(NFA<IN> nfa, IN event, long timestamp) { - Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process( - event, - timestamp); - - Collection<Map<String, IN>> matchedSequences = patterns.f0; - Collection<Tuple2<Map<String, IN>, Long>> timedOutSequences = patterns.f1; + Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = + nfa.process(event, timestamp); - emitMatchedSequences(matchedSequences, timestamp); - emitTimedOutSequences(timedOutSequences, timestamp); + emitMatchedSequences(patterns.f0, timestamp); + emitTimedOutSequences(patterns.f1, timestamp); } @Override protected void advanceTime(NFA<IN> nfa, long timestamp) { - Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp); + Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = + nfa.process(null, timestamp); emitMatchedSequences(patterns.f0, timestamp); emitTimedOutSequences(patterns.f1, timestamp); } private void emitTimedOutSequences(Iterable<Tuple2<Map<String, IN>, Long>> timedOutSequences, long timestamp) { - StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>( - null, - timestamp); + StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = + new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(null, timestamp); for (Tuple2<Map<String, IN>, Long> partialPattern: timedOutSequences) { streamRecord.replace(Either.Left(partialPattern)); @@ -75,9 +79,8 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat } protected void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) { - StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>( - null, - timestamp); + StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = + new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(null, timestamp); for (Map<String, IN> matchedPattern : matchedSequences) { streamRecord.replace(Either.Right(matchedPattern)); http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index db17f6d..f90b670 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -57,29 +57,6 @@ public class CEPOperatorTest extends TestLogger { public TemporaryFolder tempFolder = new TemporaryFolder(); @Test - public void testCEPOperatorWatermarkForwarding() throws Exception { - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>( - new CEPPatternOperator<>( - Event.createTypeSerializer(), - false, - new NFAFactory()) - ); - - harness.open(); - - Watermark expectedWatermark = new Watermark(42L); - - harness.processWatermark(expectedWatermark); - - Object watermark = harness.getOutput().poll(); - - assertTrue(watermark instanceof Watermark); - assertEquals(expectedWatermark, watermark); - - harness.close(); - } - - @Test public void testKeyedCEPOperatorWatermarkForwarding() throws Exception { KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { private static final long serialVersionUID = -4873366487571254798L; @@ -115,94 +92,6 @@ public class CEPOperatorTest extends TestLogger { } @Test - public void testCEPOperatorCheckpointing() throws Exception { - KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { - private static final long serialVersionUID = -4873366487571254798L; - - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } - }; - - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>( - new CEPPatternOperator<>( - Event.createTypeSerializer(), - false, - new NFAFactory())); - - harness.open(); - - Event startEvent = new Event(42, "start", 1.0); - SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); - Event endEvent= new Event(42, "end", 1.0); - - harness.processElement(new StreamRecord<Event>(startEvent, 1)); - harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); - - // simulate snapshot/restore with some elements in internal sorting queue - StreamStateHandle snapshot = harness.snapshotLegacy(0, 0); - harness.close(); - - harness = new OneInputStreamOperatorTestHarness<>( - new CEPPatternOperator<>( - Event.createTypeSerializer(), - false, - new NFAFactory())); - - harness.setup(); - harness.restore(snapshot); - harness.open(); - - harness.processWatermark(new Watermark(Long.MIN_VALUE)); - - harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); - - // if element timestamps are not correctly checkpointed/restored this will lead to - // a pruning time underflow exception in NFA - harness.processWatermark(new Watermark(2)); - - // simulate snapshot/restore with empty element queue but NFA state - StreamStateHandle snapshot2 = harness.snapshotLegacy(1, 1); - harness.close(); - - harness = new OneInputStreamOperatorTestHarness<>( - new CEPPatternOperator<>( - Event.createTypeSerializer(), - false, - new NFAFactory())); - - harness.setup(); - harness.restore(snapshot2); - harness.open(); - - harness.processElement(new StreamRecord<Event>(middleEvent, 3)); - harness.processElement(new StreamRecord<Event>(new Event(42, "start", 1.0), 4)); - harness.processElement(new StreamRecord<Event>(endEvent, 5)); - - harness.processWatermark(new Watermark(Long.MAX_VALUE)); - - ConcurrentLinkedQueue<Object> result = harness.getOutput(); - - // watermark and the result - assertEquals(2, result.size()); - - Object resultObject = result.poll(); - assertTrue(resultObject instanceof StreamRecord); - StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject; - assertTrue(resultRecord.getValue() instanceof Map); - - @SuppressWarnings("unchecked") - Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue(); - - assertEquals(startEvent, patternMap.get("start")); - assertEquals(middleEvent, patternMap.get("middle")); - assertEquals(endEvent, patternMap.get("end")); - - harness.close(); - } - - @Test public void testKeyedCEPOperatorCheckpointing() throws Exception { KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { @@ -493,71 +382,6 @@ public class CEPOperatorTest extends TestLogger { } } - /** - * Tests that the internal time of a CEP operator advances only given watermarks. See FLINK-5033 - */ - @Test - public void testAdvancingTimeWithoutElements() throws Exception { - final Event startEvent = new Event(42, "start", 1.0); - final long watermarkTimestamp1 = 5L; - final long watermarkTimestamp2 = 13L; - - final Map<String, Event> expectedSequence = new HashMap<>(2); - expectedSequence.put("start", startEvent); - - OneInputStreamOperatorTestHarness<Event, Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> harness = new OneInputStreamOperatorTestHarness<>( - new TimeoutCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - new NFAFactory(true)) - ); - - try { - harness.setup( - new KryoSerializer<>( - (Class<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>>) (Object) Either.class, - new ExecutionConfig())); - harness.open(); - - harness.processElement(new StreamRecord<>(startEvent, 3L)); - harness.processWatermark(new Watermark(watermarkTimestamp1)); - harness.processWatermark(new Watermark(watermarkTimestamp2)); - - Queue<Object> result = harness.getOutput(); - - assertEquals(3, result.size()); - - Object watermark1 = result.poll(); - - assertTrue(watermark1 instanceof Watermark); - - assertEquals(watermarkTimestamp1, ((Watermark) watermark1).getTimestamp()); - - Object resultObject = result.poll(); - - assertTrue(resultObject instanceof StreamRecord); - - StreamRecord<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> streamRecord = (StreamRecord<Either<Tuple2<Map<String,Event>,Long>,Map<String,Event>>>) resultObject; - - assertTrue(streamRecord.getValue() instanceof Either.Left); - - Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>> left = (Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>>) streamRecord.getValue(); - - Tuple2<Map<String, Event>, Long> leftResult = left.left(); - - assertEquals(watermarkTimestamp2, (long) leftResult.f1); - assertEquals(expectedSequence, leftResult.f0); - - Object watermark2 = result.poll(); - - assertTrue(watermark2 instanceof Watermark); - - assertEquals(watermarkTimestamp2, ((Watermark) watermark2).getTimestamp()); - } finally { - harness.close(); - } - } - private static class NFAFactory implements NFACompiler.NFAFactory<Event> { private static final long serialVersionUID = 1173020762472766713L; http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 4f4546e..f883ef5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -32,6 +32,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.functions.NullByteKeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; @@ -1124,17 +1125,4 @@ public class AllWindowedStream<T, W extends Window> { public TypeInformation<T> getInputType() { return input.getType(); } - - /** - * Used as dummy KeySelector to allow using WindowOperator for Non-Keyed Windows. - * @param <T> - */ - private static class NullByteKeySelector<T> implements KeySelector<T, Byte> { - private static final long serialVersionUID = 1L; - - @Override - public Byte getKey(T value) throws Exception { - return 0; - } - } }
