[FLINK-5846] [cep] Make the CEP operators backwards compatible with Flink 1.1
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/521a53d9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/521a53d9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/521a53d9 Branch: refs/heads/master Commit: 521a53d9ad68a3f16a32e08843a6fca2bd4e439d Parents: d998ff9 Author: kl0u <[email protected]> Authored: Mon Feb 20 14:51:51 2017 +0100 Committer: kl0u <[email protected]> Committed: Fri Mar 17 16:32:41 2017 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/cep/nfa/NFA.java | 47 ++-- .../AbstractKeyedCEPPatternOperator.java | 85 +++++- .../flink/cep/operator/CEPOperatorUtils.java | 12 +- .../cep/operator/KeyedCEPPatternOperator.java | 5 +- .../TimeoutKeyedCEPPatternOperator.java | 5 +- .../cep/operator/CEPMigration11to13Test.java | 268 +++++++++++++++++++ .../flink/cep/operator/CEPOperatorTest.java | 6 +- .../flink/cep/operator/CEPRescalingTest.java | 3 +- .../src/test/resources/cep-keyed-snapshot-1.1 | Bin 0 -> 5612 bytes .../test/resources/cep-non-keyed-snapshot-1.1 | Bin 0 -> 3274 bytes .../MultiplexingStreamRecordSerializer.java | 229 ++++++++++++++++ .../streamrecord/StreamRecordSerializer.java | 150 +++++++++++ pom.xml | 2 + 13 files changed, 769 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 8d87fd8..12afe4e 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -119,6 +119,8 @@ public class NFA<T> implements Serializable { */ private transient Queue<ComputationState<T>> computationStates; + private StateTransitionComparator<T> stateTransitionComparator; + public NFA( final TypeSerializer<T> eventSerializer, final long windowTime, @@ -131,6 +133,7 @@ public class NFA<T> implements Serializable { this.computationStates = new LinkedList<>(); this.states = new HashSet<>(); this.startEventCounter = 1; + this.stateTransitionComparator = new StateTransitionComparator<>(); } public Set<State<T>> getStates() { @@ -262,22 +265,6 @@ public class NFA<T> implements Serializable { } /** - * Comparator used for imposing the assumption that IGNORE is always the last StateTransition in a state. - */ - private interface StateTransitionComparator<T> extends Comparator<StateTransition<T>>, Serializable {} - private final Comparator<StateTransition<T>> stateTransitionComparator = new StateTransitionComparator<T>() { - private static final long serialVersionUID = -2775474935413622278L; - - @Override - public int compare(final StateTransition<T> o1, final StateTransition<T> o2) { - if (o1.getAction() == o2.getAction()) { - return 0; - } - return o1.getAction() == StateTransitionAction.IGNORE ? 1 : -1; - } - }; - - /** * Computes the next computation states based on the given computation state, the current event, * its timestamp and the internal state machine. * @@ -301,6 +288,13 @@ public class NFA<T> implements Serializable { State<T> currentState = states.pop(); final List<StateTransition<T>> stateTransitions = new ArrayList<>(currentState.getStateTransitions()); + // this is for when we restore from legacy. In that case, the comparator is null + // as it did not exist in the previous Flink versions, so we have to initialize it here. + + if (stateTransitionComparator == null) { + stateTransitionComparator = new StateTransitionComparator(); + } + // impose the IGNORE will be processed last Collections.sort(stateTransitions, stateTransitionComparator); @@ -601,10 +595,7 @@ public class NFA<T> implements Serializable { ObjectInputStream ois = new ObjectInputStream(new DataInputViewStream(source)); try { - @SuppressWarnings("unchecked") - NFA<T> nfa = null; - nfa = (NFA<T>) ois.readObject(); - return nfa; + return (NFA<T>) ois.readObject(); } catch (ClassNotFoundException e) { throw new RuntimeException("Could not deserialize NFA.", e); } @@ -637,4 +628,20 @@ public class NFA<T> implements Serializable { return getClass().hashCode(); } } + + /** + * Comparator used for imposing the assumption that IGNORE is always the last StateTransition in a state. + */ + private static final class StateTransitionComparator<T> implements Serializable, Comparator<StateTransition<T>> { + + private static final long serialVersionUID = -2775474935413622278L; + + @Override + public int compare(final StateTransition<T> o1, final StateTransition<T> o2) { + if (o1.getAction() == o2.getAction()) { + return 0; + } + return o1.getAction() == StateTransitionAction.IGNORE ? 1 : -1; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index de7daea..b6d57cd 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -25,19 +25,25 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.api.operators.InternalWatermarkCallbackService; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OnWatermarkCallback; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import java.io.IOException; +import java.io.ObjectInputStream; import java.io.Serializable; import java.util.Objects; import java.util.PriorityQueue; @@ -55,7 +61,7 @@ import java.util.PriorityQueue; */ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends AbstractStreamOperator<OUT> - implements OneInputStreamOperator<IN, OUT> { + implements OneInputStreamOperator<IN, OUT>, CheckpointedRestoringOperator { private static final long serialVersionUID = -4166778210774160757L; @@ -82,18 +88,26 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>(); private final NFACompiler.NFAFactory<IN> nfaFactory; + /** + * A flag used in the case of migration that indicates if + * we are restoring from an old keyed or non-keyed operator. + */ + private final boolean migratingFromOldKeyedOperator; + public AbstractKeyedCEPPatternOperator( final TypeSerializer<IN> inputSerializer, final boolean isProcessingTime, final KeySelector<IN, KEY> keySelector, final TypeSerializer<KEY> keySerializer, - final NFACompiler.NFAFactory<IN> nfaFactory) { + final NFACompiler.NFAFactory<IN> nfaFactory, + final boolean migratingFromOldKeyedOperator) { this.inputSerializer = Preconditions.checkNotNull(inputSerializer); this.isProcessingTime = Preconditions.checkNotNull(isProcessingTime); this.keySelector = Preconditions.checkNotNull(keySelector); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.nfaFactory = Preconditions.checkNotNull(nfaFactory); + this.migratingFromOldKeyedOperator = migratingFromOldKeyedOperator; } public TypeSerializer<IN> getInputSerializer() { @@ -104,15 +118,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); - // we have to call initializeState here and in the migration restore() - // method because the restore() (from legacy) is called before the - // initializeState(). - - initializeState(); - } - - private void initializeState() { - if (nfaOperatorState == null) { nfaOperatorState = getRuntimeContext().getState( new ValueStateDescriptor<>( @@ -252,6 +257,57 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> */ protected abstract void advanceTime(NFA<IN> nfa, long timestamp); + ////////////////////// Backwards Compatibility ////////////////////// + + @Override + public void restoreState(FSDataInputStream in) throws Exception { + // this is the flag indicating if we have udf + // state to restore (not needed here) + in.read(); + + DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(in); + InternalWatermarkCallbackService<KEY> watermarkCallbackService = getInternalWatermarkCallbackService(); + + if (migratingFromOldKeyedOperator) { + int numberEntries = inputView.readInt(); + for (int i = 0; i <numberEntries; i++) { + watermarkCallbackService.registerKeyForWatermarkCallback(keySerializer.deserialize(inputView)); + } + } else { + + final ObjectInputStream ois = new ObjectInputStream(in); + + // retrieve the NFA + @SuppressWarnings("unchecked") + NFA<IN> nfa = (NFA<IN>) ois.readObject(); + + // retrieve the elements that were pending in the priority queue + MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer); + PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueFactory.createPriorityQueue(); + int entries = ois.readInt(); + for (int i = 0; i < entries; i++) { + StreamElement streamElement = recordSerializer.deserialize(inputView); + priorityQueue.offer(streamElement.<IN>asRecord()); + } + + // finally register the retrieved state with the new keyed state. + setCurrentKey((byte) 0); + nfaOperatorState.update(nfa); + priorityQueueOperatorState.update(priorityQueue); + + if (!isProcessingTime) { + // this is relevant only for event/ingestion time + + // need to work around type restrictions + InternalWatermarkCallbackService rawWatermarkCallbackService = + (InternalWatermarkCallbackService) watermarkCallbackService; + + rawWatermarkCallbackService.registerKeyForWatermarkCallback((byte) 0); + } + ois.close(); + } + } + ////////////////////// Utility Classes ////////////////////// /** @@ -348,6 +404,11 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> } @Override + public boolean isCompatibleWith(TypeSerializer<?> other) { + return equals(other) || other instanceof AbstractKeyedCEPPatternOperator.PriorityQueueSerializer; + } + + @Override public boolean equals(Object obj) { if (obj instanceof PriorityQueueSerializer) { @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java index 56ecb17..a5eef45 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java @@ -72,7 +72,8 @@ public class CEPOperatorUtils { isProcessingTime, keySelector, keySerializer, - nfaFactory)); + nfaFactory, + true)); } else { KeySelector<T, Byte> keySelector = new NullByteKeySelector<>(); @@ -86,7 +87,8 @@ public class CEPOperatorUtils { isProcessingTime, keySelector, keySerializer, - nfaFactory + nfaFactory, + false )).forceNonParallel(); } @@ -133,7 +135,8 @@ public class CEPOperatorUtils { isProcessingTime, keySelector, keySerializer, - nfaFactory)); + nfaFactory, + true)); } else { KeySelector<T, Byte> keySelector = new NullByteKeySelector<>(); @@ -147,7 +150,8 @@ public class CEPOperatorUtils { isProcessingTime, keySelector, keySerializer, - nfaFactory + nfaFactory, + false )).forceNonParallel(); } http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java index 5b6ffe2..21cee23 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java @@ -45,9 +45,10 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOpe boolean isProcessingTime, KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, - NFACompiler.NFAFactory<IN> nfaFactory) { + NFACompiler.NFAFactory<IN> nfaFactory, + boolean migratingFromOldKeyedOperator) { - super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory); + super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java index 6889bb9..c6fba55 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java @@ -45,9 +45,10 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat boolean isProcessingTime, KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, - NFACompiler.NFAFactory<IN> nfaFactory) { + NFACompiler.NFAFactory<IN> nfaFactory, + boolean migratingFromOldKeyedOperator) { - super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory); + super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java new file mode 100644 index 0000000..5a3e623 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.ByteSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.functions.NullByteKeySelector; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.SubEvent; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.junit.Test; + +import java.net.URL; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CEPMigration11to13Test { + + private static String getResourceFilename(String filename) { + ClassLoader cl = CEPMigration11to13Test.class.getClassLoader(); + URL resource = cl.getResource(filename); + if (resource == null) { + throw new NullPointerException("Missing snapshot resource."); + } + return resource.getFile(); + } + + @Test + public void testKeyedCEPOperatorMigratation() throws Exception { + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent = new Event(42, "start", 1.0); + final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); + final Event endEvent = new Event(42, "end", 1.0); + + // uncomment these lines for regenerating the snapshot on Flink 1.1 + /* + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory())); + harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); + harness.open(); + harness.processElement(new StreamRecord<Event>(startEvent, 1)); + harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); + harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + harness.processWatermark(new Watermark(2)); + // simulate snapshot/restore with empty element queue but NFA state + StreamTaskState snapshot = harness.snapshot(1, 1); + FileOutputStream out = new FileOutputStream( + "src/test/resources/cep-keyed-snapshot-1.1"); + ObjectOutputStream oos = new ObjectOutputStream(out); + oos.writeObject(snapshot); + out.close(); + harness.close(); + */ + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-snapshot-1.1")); + harness.open(); + + harness.processElement(new StreamRecord<Event>(middleEvent, 3)); + harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); + harness.processElement(new StreamRecord<>(endEvent, 5)); + + harness.processWatermark(new Watermark(20)); + + ConcurrentLinkedQueue<Object> result = harness.getOutput(); + + // watermark and the result + assertEquals(2, result.size()); + + Object resultObject = result.poll(); + assertTrue(resultObject instanceof StreamRecord); + StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject; + assertTrue(resultRecord.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue(); + + assertEquals(startEvent, patternMap.get("start")); + assertEquals(middleEvent, patternMap.get("middle")); + assertEquals(endEvent, patternMap.get("end")); + + harness.close(); + } + + @Test + public void testNonKeyedCEPFunctionMigration() throws Exception { + + final Event startEvent = new Event(42, "start", 1.0); + final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); + final Event endEvent= new Event(42, "end", 1.0); + + // uncomment these lines for regenerating the snapshot on Flink 1.1 + /* + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>( + new CEPPatternOperator<>( + Event.createTypeSerializer(), + false, + new NFAFactory())); + harness.open(); + harness.processElement(new StreamRecord<Event>(startEvent, 1)); + harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); + harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + harness.processWatermark(new Watermark(2)); + + // simulate snapshot/restore with empty element queue but NFA state + StreamTaskState snapshot = harness.snapshot(1, 1); + FileOutputStream out = new FileOutputStream( + "src/test/resources/cep-non-keyed-snapshot-1.1"); + ObjectOutputStream oos = new ObjectOutputStream(out); + oos.writeObject(snapshot); + out.close(); + harness.close(); + */ + + NullByteKeySelector keySelector = new NullByteKeySelector(); + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + ByteSerializer.INSTANCE, + new NFAFactory(), + false), + keySelector, + BasicTypeInfo.BYTE_TYPE_INFO); + + harness.setup(); + harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-snapshot-1.1")); + harness.open(); + + harness.processElement(new StreamRecord<Event>(middleEvent, 3)); + harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); + harness.processElement(new StreamRecord<>(endEvent, 5)); + + harness.processWatermark(new Watermark(Long.MAX_VALUE)); + + ConcurrentLinkedQueue<Object> result = harness.getOutput(); + + // watermark and the result + assertEquals(2, result.size()); + + Object resultObject = result.poll(); + assertTrue(resultObject instanceof StreamRecord); + StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject; + assertTrue(resultRecord.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue(); + + assertEquals(startEvent, patternMap.get("start")); + assertEquals(middleEvent, patternMap.get("middle")); + assertEquals(endEvent, patternMap.get("end")); + + harness.close(); + } + + private static class NFAFactory implements NFACompiler.NFAFactory<Event> { + + private static final long serialVersionUID = 1173020762472766713L; + + private final boolean handleTimeout; + + private NFAFactory() { + this(false); + } + + private NFAFactory(boolean handleTimeout) { + this.handleTimeout = handleTimeout; + } + + @Override + public NFA<Event> createNFA() { + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter()) + .followedBy("middle").subtype(SubEvent.class).where(new MiddleFilter()) + .followedBy("end").where(new EndFilter()) + // add a window timeout to test whether timestamps of elements in the + // priority queue in CEP operator are correctly checkpointed/restored + .within(Time.milliseconds(10L)); + + return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout); + } + } + + private static class StartFilter implements FilterFunction<Event> { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + } + + private static class MiddleFilter implements FilterFunction<SubEvent> { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getVolume() > 5.0; + } + } + + private static class EndFilter implements FilterFunction<Event> { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 4ae74b9..a99db05 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -224,7 +224,8 @@ public class CEPOperatorTest extends TestLogger { false, keySelector, IntSerializer.INSTANCE, - new NFAFactory(true)), + new NFAFactory(true), + true), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -480,7 +481,8 @@ public class CEPOperatorTest extends TestLogger { isProcessingTime, keySelector, IntSerializer.INSTANCE, - new NFAFactory()); + new NFAFactory(), + true); } private static class TestKeySelector implements KeySelector<Event, Integer> { http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java index 78765c0..399662a 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java @@ -345,7 +345,8 @@ public class CEPRescalingTest { false, keySelector, BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()), - new NFAFactory()), + new NFAFactory(), + true), keySelector, BasicTypeInfo.INT_TYPE_INFO, maxParallelism, http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1 ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1 b/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1 new file mode 100644 index 0000000..277de1d Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1 differ http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1 ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1 b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1 new file mode 100644 index 0000000..b5ca51e Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1 differ http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java new file mode 100644 index 0000000..a0f5a60 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.migration.streaming.runtime.streamrecord; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.IOException; + +import static java.util.Objects.requireNonNull; + +public class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<StreamElement> { + + + private static final long serialVersionUID = 1L; + + private static final int TAG_REC_WITH_TIMESTAMP = 0; + private static final int TAG_REC_WITHOUT_TIMESTAMP = 1; + private static final int TAG_WATERMARK = 2; + + + private final TypeSerializer<T> typeSerializer; + + + public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) { + if (serializer instanceof MultiplexingStreamRecordSerializer || serializer instanceof StreamRecordSerializer) { + throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer); + } + this.typeSerializer = requireNonNull(serializer); + } + + public TypeSerializer<T> getContainedTypeSerializer() { + return this.typeSerializer; + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public MultiplexingStreamRecordSerializer<T> duplicate() { + TypeSerializer<T> copy = typeSerializer.duplicate(); + return (copy == typeSerializer) ? this : new MultiplexingStreamRecordSerializer<T>(copy); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @Override + public StreamRecord<T> createInstance() { + return new StreamRecord<T>(typeSerializer.createInstance()); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public StreamElement copy(StreamElement from) { + // we can reuse the timestamp since Instant is immutable + if (from.isRecord()) { + StreamRecord<T> fromRecord = from.asRecord(); + return fromRecord.copy(typeSerializer.copy(fromRecord.getValue())); + } + else if (from.isWatermark()) { + // is immutable + return from; + } + else { + throw new RuntimeException(); + } + } + + @Override + public StreamElement copy(StreamElement from, StreamElement reuse) { + if (from.isRecord() && reuse.isRecord()) { + StreamRecord<T> fromRecord = from.asRecord(); + StreamRecord<T> reuseRecord = reuse.asRecord(); + + T valueCopy = typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue()); + fromRecord.copyTo(valueCopy, reuseRecord); + return reuse; + } + else if (from.isWatermark()) { + // is immutable + return from; + } + else { + throw new RuntimeException("Cannot copy " + from + " -> " + reuse); + } + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + int tag = source.readByte(); + target.write(tag); + + if (tag == TAG_REC_WITH_TIMESTAMP) { + // move timestamp + target.writeLong(source.readLong()); + typeSerializer.copy(source, target); + } + else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { + typeSerializer.copy(source, target); + } + else if (tag == TAG_WATERMARK) { + target.writeLong(source.readLong()); + } + else { + throw new IOException("Corrupt stream, found tag: " + tag); + } + } + + @Override + public void serialize(StreamElement value, DataOutputView target) throws IOException { + if (value.isRecord()) { + StreamRecord<T> record = value.asRecord(); + + if (record.hasTimestamp()) { + target.write(TAG_REC_WITH_TIMESTAMP); + target.writeLong(record.getTimestamp()); + } else { + target.write(TAG_REC_WITHOUT_TIMESTAMP); + } + typeSerializer.serialize(record.getValue(), target); + } + else if (value.isWatermark()) { + target.write(TAG_WATERMARK); + target.writeLong(value.asWatermark().getTimestamp()); + } + else { + throw new RuntimeException(); + } + } + + @Override + public StreamElement deserialize(DataInputView source) throws IOException { + int tag = source.readByte(); + if (tag == TAG_REC_WITH_TIMESTAMP) { + long timestamp = source.readLong(); + return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp); + } + else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { + return new StreamRecord<T>(typeSerializer.deserialize(source)); + } + else if (tag == TAG_WATERMARK) { + return new Watermark(source.readLong()); + } + else { + throw new IOException("Corrupt stream, found tag: " + tag); + } + } + + @Override + public StreamElement deserialize(StreamElement reuse, DataInputView source) throws IOException { + int tag = source.readByte(); + if (tag == TAG_REC_WITH_TIMESTAMP) { + long timestamp = source.readLong(); + T value = typeSerializer.deserialize(source); + StreamRecord<T> reuseRecord = reuse.asRecord(); + reuseRecord.replace(value, timestamp); + return reuseRecord; + } + else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { + T value = typeSerializer.deserialize(source); + StreamRecord<T> reuseRecord = reuse.asRecord(); + reuseRecord.replace(value); + return reuseRecord; + } + else if (tag == TAG_WATERMARK) { + return new Watermark(source.readLong()); + } + else { + throw new IOException("Corrupt stream, found tag: " + tag); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @Override + public boolean equals(Object obj) { + if (obj instanceof MultiplexingStreamRecordSerializer) { + MultiplexingStreamRecordSerializer<?> other = (MultiplexingStreamRecordSerializer<?>) obj; + + return other.canEqual(this) && typeSerializer.equals(other.typeSerializer); + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof MultiplexingStreamRecordSerializer; + } + + @Override + public int hashCode() { + return typeSerializer.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java new file mode 100644 index 0000000..0235ab8 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.migration.streaming.runtime.streamrecord; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * Serializer for {@link StreamRecord}. This version ignores timestamps and only deals with + * the element. + * + * <p> + * {@link MultiplexingStreamRecordSerializer} is a version that deals with timestamps and also + * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks} in the same + * stream with {@link StreamRecord StreamRecords}. + * + * @see MultiplexingStreamRecordSerializer + * + * @param <T> The type of value in the {@link StreamRecord} + */ +@Internal +public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> { + + private static final long serialVersionUID = 1L; + + private final TypeSerializer<T> typeSerializer; + + + public StreamRecordSerializer(TypeSerializer<T> serializer) { + if (serializer instanceof StreamRecordSerializer) { + throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer); + } + this.typeSerializer = Preconditions.checkNotNull(serializer); + } + + public TypeSerializer<T> getContainedTypeSerializer() { + return this.typeSerializer; + } + + // ------------------------------------------------------------------------ + // General serializer and type utils + // ------------------------------------------------------------------------ + + @Override + public StreamRecordSerializer<T> duplicate() { + TypeSerializer<T> serializerCopy = typeSerializer.duplicate(); + return serializerCopy == typeSerializer ? this : new StreamRecordSerializer<T>(serializerCopy); + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public int getLength() { + return typeSerializer.getLength(); + } + + // ------------------------------------------------------------------------ + // Type serialization, copying, instantiation + // ------------------------------------------------------------------------ + + @Override + public StreamRecord<T> createInstance() { + try { + return new StreamRecord<T>(typeSerializer.createInstance()); + } catch (Exception e) { + throw new RuntimeException("Cannot instantiate StreamRecord.", e); + } + } + + @Override + public StreamRecord<T> copy(StreamRecord<T> from) { + return from.copy(typeSerializer.copy(from.getValue())); + } + + @Override + public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) { + from.copyTo(typeSerializer.copy(from.getValue(), reuse.getValue()), reuse); + return reuse; + } + + @Override + public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException { + typeSerializer.serialize(value.getValue(), target); + } + + @Override + public StreamRecord<T> deserialize(DataInputView source) throws IOException { + return new StreamRecord<T>(typeSerializer.deserialize(source)); + } + + @Override + public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException { + T element = typeSerializer.deserialize(reuse.getValue(), source); + reuse.replace(element); + return reuse; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + typeSerializer.copy(source, target); + } + + // ------------------------------------------------------------------------ + + @Override + public boolean equals(Object obj) { + if (obj instanceof StreamRecordSerializer) { + StreamRecordSerializer<?> other = (StreamRecordSerializer<?>) obj; + + return other.canEqual(this) && typeSerializer.equals(other.typeSerializer); + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof StreamRecordSerializer; + } + + @Override + public int hashCode() { + return typeSerializer.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 661a72d..538aa4b 100644 --- a/pom.xml +++ b/pom.xml @@ -911,6 +911,8 @@ under the License. <exclude>flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot</exclude> <exclude>flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint</exclude> <exclude>flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb</exclude> + <exclude>flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1</exclude> + <exclude>flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1</exclude> <!-- TweetInputFormat Test Data--> <exclude>flink-contrib/flink-tweet-inputformat/src/main/resources/HashTagTweetSample.json</exclude>
