[FLINK-6604] [cep] Remove java serialization from the library.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a54d05e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a54d05e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a54d05e Branch: refs/heads/master Commit: 7a54d05ecd33b6dc140a7146f0efa90d64471f47 Parents: f7ebcb0 Author: kkloudas <kklou...@gmail.com> Authored: Tue May 16 17:07:29 2017 +0200 Committer: kkloudas <kklou...@gmail.com> Committed: Wed May 17 14:37:34 2017 +0200 ---------------------------------------------------------------------- .../org/apache/flink/cep/nfa/DeweyNumber.java | 98 ++- .../main/java/org/apache/flink/cep/nfa/NFA.java | 517 ++++++++++++++-- .../org/apache/flink/cep/nfa/SharedBuffer.java | 596 ++++++++++++++----- .../java/org/apache/flink/cep/nfa/State.java | 2 + .../flink/cep/nfa/compiler/NFACompiler.java | 81 ++- .../AbstractKeyedCEPPatternOperator.java | 16 +- .../java/org/apache/flink/cep/nfa/NFATest.java | 182 ++++-- .../apache/flink/cep/nfa/SharedBufferTest.java | 14 +- .../cep/operator/CEPFrom12MigrationTest.java | 99 ++- .../cep/operator/CEPMigration11to13Test.java | 102 +++- .../flink/cep/operator/CEPOperatorTest.java | 110 +++- 11 files changed, 1499 insertions(+), 318 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java index fd3fafa..3827956 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java @@ -18,6 +18,13 @@ package org.apache.flink.cep.nfa; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; import java.io.Serializable; import java.util.Arrays; @@ -40,14 +47,14 @@ public class DeweyNumber implements Serializable { deweyNumber = new int[]{start}; } - protected DeweyNumber(int[] deweyNumber) { - this.deweyNumber = deweyNumber; - } - public DeweyNumber(DeweyNumber number) { this.deweyNumber = Arrays.copyOf(number.deweyNumber, number.deweyNumber.length); } + private DeweyNumber(int[] deweyNumber) { + this.deweyNumber = deweyNumber; + } + /** * Checks whether this dewey number is compatible to the other dewey number. * @@ -175,4 +182,87 @@ public class DeweyNumber implements Serializable { return new DeweyNumber(deweyNumber); } } + + /** + * A {@link TypeSerializer} for the {@link DeweyNumber} which serves as a version number. + */ + public static class DeweyNumberSerializer extends TypeSerializerSingleton<DeweyNumber> { + + private static final long serialVersionUID = -5086792497034943656L; + + private final IntSerializer elemSerializer = IntSerializer.INSTANCE; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public DeweyNumber createInstance() { + return new DeweyNumber(1); + } + + @Override + public DeweyNumber copy(DeweyNumber from) { + return new DeweyNumber(from); + } + + @Override + public DeweyNumber copy(DeweyNumber from, DeweyNumber reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(DeweyNumber record, DataOutputView target) throws IOException { + final int size = record.length(); + target.writeInt(size); + for (int i = 0; i < size; i++) { + elemSerializer.serialize(record.deweyNumber[i], target); + } + } + + @Override + public DeweyNumber deserialize(DataInputView source) throws IOException { + final int size = source.readInt(); + int[] number = new int[size]; + for (int i = 0; i < size; i++) { + number[i] = elemSerializer.deserialize(source); + } + return new DeweyNumber(number); + } + + @Override + public DeweyNumber deserialize(DeweyNumber reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + final int size = source.readInt(); + target.writeInt(size); + for (int i = 0; i < size; i++) { + elemSerializer.copy(source, target); + } + } + + @Override + public boolean equals(Object obj) { + return obj == this || obj.getClass().equals(getClass()); + } + + @Override + public boolean canEqual(Object obj) { + return true; + } + + @Override + public int hashCode() { + return elemSerializer.hashCode(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index f2ade9e..ab5cd8e 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -21,11 +21,17 @@ package org.apache.flink.cep.nfa; import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.ListMultimap; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.base.EnumSerializer; import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; -import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; import org.apache.flink.cep.NonDuplicatingTypeSerializer; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.conditions.IterativeCondition; @@ -86,19 +92,35 @@ public class NFA<T> implements Serializable { private static final long serialVersionUID = 2957674889294717265L; - private final NonDuplicatingTypeSerializer<T> nonDuplicatingTypeSerializer; + ///////////////////// Backwards Compatibility Fields ///////////////////// /** - * Used only for backwards compatibility. Buffer used to store the matched events. + * @deprecated Used only for backwards compatibility. + * Look at the {@link #eventSharedBuffer} for its replacement. */ + @Deprecated private final SharedBuffer<State<T>, T> sharedBuffer = null; /** + * @deprecated Used only for backward compatibility. + */ + @Deprecated + private int startEventCounter; + + /** + * @deprecated Used only for backwards compatibility. + */ + @Deprecated + private final NonDuplicatingTypeSerializer<T> nonDuplicatingTypeSerializer; + + ////////////////// End of Backwards Compatibility Fields ////////////////// + + /** * A set of all the valid NFA states, as returned by the * {@link NFACompiler NFACompiler}. * These are directly derived from the user-specified pattern. */ - private final Set<State<T>> states; + private Set<State<T>> states; /** * The length of a windowed pattern, as specified using the @@ -114,11 +136,6 @@ public class NFA<T> implements Serializable { private final boolean handleTimeout; /** - * Used only for backward compatibility. - */ - private int startEventCounter; - - /** * Current set of {@link ComputationState computation states} within the state machine. * These are the "active" intermediate states that are waiting for new matching * events to transition to new valid states. @@ -126,19 +143,22 @@ public class NFA<T> implements Serializable { private transient Queue<ComputationState<T>> computationStates; /** - * Buffer used to store the matched events. + * Buffer used to store the matched events. */ - private final SharedBuffer<String, T> stringSharedBuffer; + private SharedBuffer<String, T> eventSharedBuffer; + + private TypeSerializer<T> eventSerializer; public NFA( final TypeSerializer<T> eventSerializer, final long windowTime, final boolean handleTimeout) { + this.eventSerializer = eventSerializer; this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer); this.windowTime = windowTime; this.handleTimeout = handleTimeout; - this.stringSharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer); + this.eventSharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer); this.computationStates = new LinkedList<>(); this.states = new HashSet<>(); } @@ -169,7 +189,7 @@ public class NFA<T> implements Serializable { * {@code false} otherwise. */ public boolean isEmpty() { - return stringSharedBuffer.isEmpty(); + return eventSharedBuffer.isEmpty(); } /** @@ -207,7 +227,7 @@ public class NFA<T> implements Serializable { timeoutResult.add(Tuple2.of(timedoutPattern, timestamp)); } - stringSharedBuffer.release( + eventSharedBuffer.release( computationState.getPreviousState().getName(), computationState.getEvent(), computationState.getTimestamp(), @@ -231,7 +251,7 @@ public class NFA<T> implements Serializable { result.add(matchedPattern); // remove found patterns because they are no longer needed - stringSharedBuffer.release( + eventSharedBuffer.release( newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp(), @@ -239,7 +259,7 @@ public class NFA<T> implements Serializable { } else if (newComputationState.isStopState()) { //reached stop state. release entry for the stop state shouldDiscardPath = true; - stringSharedBuffer.release( + eventSharedBuffer.release( newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp(), @@ -254,7 +274,7 @@ public class NFA<T> implements Serializable { // a stop state was reached in this branch. release branch which results in removing previous event from // the buffer for (final ComputationState<T> state : statesToRetain) { - stringSharedBuffer.release( + eventSharedBuffer.release( state.getPreviousState().getName(), state.getEvent(), state.getTimestamp(), @@ -275,7 +295,7 @@ public class NFA<T> implements Serializable { // remove all elements which are expired // with respect to the window length - stringSharedBuffer.prune(pruningTimestamp); + eventSharedBuffer.prune(pruningTimestamp); } } @@ -289,7 +309,7 @@ public class NFA<T> implements Serializable { NFA<T> other = (NFA<T>) obj; return nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) && - stringSharedBuffer.equals(other.stringSharedBuffer) && + eventSharedBuffer.equals(other.eventSharedBuffer) && states.equals(other.states) && windowTime == other.windowTime; } else { @@ -299,7 +319,7 @@ public class NFA<T> implements Serializable { @Override public int hashCode() { - return Objects.hash(nonDuplicatingTypeSerializer, stringSharedBuffer, states, windowTime); + return Objects.hash(nonDuplicatingTypeSerializer, eventSharedBuffer, states, windowTime); } private static <T> boolean isEquivalentState(final State<T> s1, final State<T> s2) { @@ -445,14 +465,14 @@ public class NFA<T> implements Serializable { final long startTimestamp; if (computationState.isStartState()) { startTimestamp = timestamp; - counter = stringSharedBuffer.put( + counter = eventSharedBuffer.put( currentState.getName(), event, timestamp, currentVersion); } else { startTimestamp = computationState.getStartTimestamp(); - counter = stringSharedBuffer.put( + counter = eventSharedBuffer.put( currentState.getName(), event, timestamp, @@ -502,7 +522,7 @@ public class NFA<T> implements Serializable { if (computationState.getEvent() != null) { // release the shared entry referenced by the current computation state. - stringSharedBuffer.release( + eventSharedBuffer.release( computationState.getPreviousState().getName(), computationState.getEvent(), computationState.getTimestamp(), @@ -524,7 +544,7 @@ public class NFA<T> implements Serializable { ComputationState<T> computationState = ComputationState.createState( this, currentState, previousState, event, counter, timestamp, version, startTimestamp); computationStates.add(computationState); - stringSharedBuffer.lock(previousState.getName(), event, timestamp, counter); + eventSharedBuffer.lock(previousState.getName(), event, timestamp, counter); } private State<T> findFinalStateAfterProceed(State<T> state, T event, ComputationState<T> computationState) { @@ -609,7 +629,12 @@ public class NFA<T> implements Serializable { return new HashMap<>(); } - Collection<ListMultimap<String, T>> paths = stringSharedBuffer.extractPatterns( + // the following is used when migrating from previous versions. + if (eventSerializer == null) { + eventSerializer = nonDuplicatingTypeSerializer.getTypeSerializer(); + } + + Collection<ListMultimap<String, T>> paths = eventSharedBuffer.extractPatterns( computationState.getPreviousState().getName(), computationState.getEvent(), computationState.getTimestamp(), @@ -619,19 +644,22 @@ public class NFA<T> implements Serializable { // for a given computation state, we cannot have more than one matching patterns. Preconditions.checkState(paths.size() <= 1); - TypeSerializer<T> serializer = nonDuplicatingTypeSerializer.getTypeSerializer(); - Map<String, List<T>> result = new HashMap<>(); for (ListMultimap<String, T> path: paths) { for (String key: path.keySet()) { List<T> events = path.get(key); - List<T> values = new ArrayList<>(events.size()); + String originalKey = NFACompiler.getOriginalStateNameFromInternal(key); + List<T> values = result.get(originalKey); + if (values == null) { + values = new ArrayList<>(events.size()); + } + for (T event: events) { // copy the element so that the user can change it - values.add(serializer.isImmutableType() ? event : serializer.copy(event)); + values.add(eventSerializer.isImmutableType() ? event : eventSerializer.copy(event)); } - result.put(key, values); + result.put(originalKey, values); } } return result; @@ -639,18 +667,6 @@ public class NFA<T> implements Serializable { ////////////////////// Fault-Tolerance / Migration ////////////////////// - private void writeObject(ObjectOutputStream oos) throws IOException { - oos.defaultWriteObject(); - - oos.writeInt(computationStates.size()); - - for(ComputationState<T> computationState: computationStates) { - writeComputationState(computationState, oos); - } - - nonDuplicatingTypeSerializer.clearReferences(); - } - private final static String BEGINNING_STATE_NAME = "$beginningState$"; private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { @@ -676,7 +692,7 @@ public class NFA<T> implements Serializable { try { //Backwards compatibility this.computationStates.addAll(migrateNFA(readComputationStates)); - final Field newSharedBufferField = NFA.class.getDeclaredField("stringSharedBuffer"); + final Field newSharedBufferField = NFA.class.getDeclaredField("eventSharedBuffer"); final Field sharedBufferField = NFA.class.getDeclaredField("sharedBuffer"); sharedBufferField.setAccessible(true); newSharedBufferField.setAccessible(true); @@ -760,24 +776,6 @@ public class NFA<T> implements Serializable { return computationStates; } - private void writeComputationState(final ComputationState<T> computationState, final ObjectOutputStream oos) throws IOException { - oos.writeObject(computationState.getState()); - oos.writeObject(computationState.getPreviousState()); - oos.writeLong(computationState.getTimestamp()); - oos.writeObject(computationState.getVersion()); - oos.writeLong(computationState.getStartTimestamp()); - - if (computationState.getEvent() == null) { - // write that we don't have an event associated - oos.writeBoolean(false); - } else { - // write that we have an event associated - oos.writeBoolean(true); - DataOutputViewStreamWrapper output = new DataOutputViewStreamWrapper(oos); - nonDuplicatingTypeSerializer.serialize(computationState.getEvent(), output); - } - } - @SuppressWarnings("unchecked") private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException { final State<T> state = (State<T>)ois.readObject(); @@ -805,7 +803,397 @@ public class NFA<T> implements Serializable { return ComputationState.createState(this, state, previousState, event, 0, timestamp, version, startTimestamp); } - ////////////////////// Serialization ////////////////////// + ////////////////////// New Serialization ////////////////////// + + /** + * The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state. + */ + public static final class NFASerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + /** This empty constructor is required for deserializing the configuration. */ + public NFASerializerConfigSnapshot() {} + + public NFASerializerConfigSnapshot( + TypeSerializerConfigSnapshot sharedBufferSerializerConfigSnapshot, + TypeSerializerConfigSnapshot eventSerializerConfigSnapshot) { + + super(sharedBufferSerializerConfigSnapshot, eventSerializerConfigSnapshot); + } + + @Override + public int getVersion() { + return VERSION; + } + } + + /** + * A {@link TypeSerializer} for {@link NFA} that uses Java Serialization. + */ + public static class NFASerializer<T> extends TypeSerializer<NFA<T>> { + + private static final long serialVersionUID = 2098282423980597010L; + + private final TypeSerializer<SharedBuffer<String, T>> sharedBufferSerializer; + + private final TypeSerializer<T> eventSerializer; + + public NFASerializer(TypeSerializer<T> typeSerializer) { + this(typeSerializer, new SharedBuffer.SharedBufferSerializer<>(StringSerializer.INSTANCE, typeSerializer)); + } + + public NFASerializer( + TypeSerializer<T> typeSerializer, + TypeSerializer<SharedBuffer<String, T>> sharedBufferSerializer) { + this.eventSerializer = typeSerializer; + this.sharedBufferSerializer = sharedBufferSerializer; + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer<NFA<T>> duplicate() { + return this; + } + + @Override + public NFA<T> createInstance() { + return null; + } + + private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { + ois.defaultReadObject(); + } + + @Override + public NFA<T> copy(NFA<T> from) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + serialize(from, new DataOutputViewStreamWrapper(oos)); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bais); + + @SuppressWarnings("unchecked") + NFA<T> copy = deserialize(new DataInputViewStreamWrapper(ois)); + ois.close(); + bais.close(); + return copy; + } catch (IOException e) { + throw new RuntimeException("Could not copy NFA.", e); + } + } + + @Override + public NFA<T> copy(NFA<T> from, NFA<T> reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(NFA<T> record, DataOutputView target) throws IOException { + serializeStates(record.states, target); + target.writeLong(record.windowTime); + target.writeBoolean(record.handleTimeout); + + sharedBufferSerializer.serialize(record.eventSharedBuffer, target); + + target.writeInt(record.computationStates.size()); + + StringSerializer stateNameSerializer = StringSerializer.INSTANCE; + LongSerializer timestampSerializer = LongSerializer.INSTANCE; + DeweyNumber.DeweyNumberSerializer versionSerializer = new DeweyNumber.DeweyNumberSerializer(); + + for (ComputationState<T> computationState: record.computationStates) { + stateNameSerializer.serialize(computationState.getState().getName(), target); + stateNameSerializer.serialize(computationState.getPreviousState() == null + ? null : computationState.getPreviousState().getName(), target); + + timestampSerializer.serialize(computationState.getTimestamp(), target); + versionSerializer.serialize(computationState.getVersion(), target); + timestampSerializer.serialize(computationState.getStartTimestamp(), target); + + if (computationState.getEvent() == null) { + target.writeBoolean(false); + } else { + target.writeBoolean(true); + eventSerializer.serialize(computationState.getEvent(), target); + } + } + } + + @Override + public NFA<T> deserialize(DataInputView source) throws IOException { + Set<State<T>> states = deserializeStates(source); + long windowTime = source.readLong(); + boolean handleTimeout = source.readBoolean(); + + NFA<T> nfa = new NFA<>(eventSerializer, windowTime, handleTimeout); + nfa.states = states; + + nfa.eventSharedBuffer = sharedBufferSerializer.deserialize(source); + + Queue<ComputationState<T>> computationStates = new LinkedList<>(); + StringSerializer stateNameSerializer = StringSerializer.INSTANCE; + LongSerializer timestampSerializer = LongSerializer.INSTANCE; + DeweyNumber.DeweyNumberSerializer versionSerializer = new DeweyNumber.DeweyNumberSerializer(); + + int computationStateNo = source.readInt(); + for (int i = 0; i < computationStateNo; i++) { + State<T> state = getStateByName(stateNameSerializer.deserialize(source), nfa); + State<T> prevState = getStateByName(stateNameSerializer.deserialize(source), nfa); + long timestamp = timestampSerializer.deserialize(source); + DeweyNumber version = versionSerializer.deserialize(source); + long startTimestamp = timestampSerializer.deserialize(source); + + T event = null; + if (source.readBoolean()) { + event = eventSerializer.deserialize(source); + } + + computationStates.add(ComputationState.createState( + nfa, state, prevState, event, 0, timestamp, version, startTimestamp)); + } + + nfa.computationStates = computationStates; + return nfa; + } + + private State<T> getStateByName(String name, NFA<T> nfa) { + for (State<T> state: nfa.states) { + if (state.getName().equals(name)) { + return state; + } + } + return null; + } + + @Override + public NFA<T> deserialize(NFA<T> reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + Set<State<T>> states = deserializeStates(source); + serializeStates(states, target); + + long windowTime = source.readLong(); + target.writeLong(windowTime); + + boolean handleTimeout = source.readBoolean(); + target.writeBoolean(handleTimeout); + + SharedBuffer<String, T> sharedBuffer = sharedBufferSerializer.deserialize(source); + sharedBufferSerializer.serialize(sharedBuffer, target); + + StringSerializer stateNameSerializer = StringSerializer.INSTANCE; + LongSerializer timestampSerializer = LongSerializer.INSTANCE; + DeweyNumber.DeweyNumberSerializer versionSerializer = new DeweyNumber.DeweyNumberSerializer(); + + int computationStateNo = source.readInt(); + target.writeInt(computationStateNo); + + for (int i = 0; i < computationStateNo; i++) { + String stateName = stateNameSerializer.deserialize(source); + stateNameSerializer.serialize(stateName, target); + + String prevStateName = stateNameSerializer.deserialize(source); + stateNameSerializer.serialize(prevStateName, target); + + long timestamp = timestampSerializer.deserialize(source); + timestampSerializer.serialize(timestamp, target); + + DeweyNumber version = versionSerializer.deserialize(source); + versionSerializer.serialize(version, target); + + long startTimestamp = timestampSerializer.deserialize(source); + timestampSerializer.serialize(startTimestamp, target); + + boolean hasEvent = source.readBoolean(); + target.writeBoolean(hasEvent); + if (hasEvent) { + T event = eventSerializer.deserialize(source); + eventSerializer.serialize(event, target); + } + } + } + + @Override + public boolean equals(Object obj) { + return obj == this || + (obj != null && obj.getClass().equals(getClass()) && + sharedBufferSerializer.equals(((NFASerializer) obj).sharedBufferSerializer) && + eventSerializer.equals(((NFASerializer) obj).eventSerializer)); + } + + @Override + public boolean canEqual(Object obj) { + return true; + } + + @Override + public int hashCode() { + return 37 * sharedBufferSerializer.hashCode() + eventSerializer.hashCode(); + } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + return new NFASerializerConfigSnapshot( + eventSerializer.snapshotConfiguration(), + sharedBufferSerializer.snapshotConfiguration() + ); + } + public CompatibilityResult<NFA<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof NFASerializerConfigSnapshot) { + TypeSerializerConfigSnapshot[] serializerConfigSnapshots = + ((NFASerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots(); + + CompatibilityResult<T> elementCompatResult = + eventSerializer.ensureCompatibility(serializerConfigSnapshots[0]); + CompatibilityResult<SharedBuffer<String, T>> sharedBufCompatResult = + sharedBufferSerializer.ensureCompatibility(serializerConfigSnapshots[1]); + + if (!sharedBufCompatResult.isRequiresMigration() && !elementCompatResult.isRequiresMigration()) { + return CompatibilityResult.compatible(); + } else { + if (elementCompatResult.getConvertDeserializer() != null && + sharedBufCompatResult.getConvertDeserializer() != null) { + return CompatibilityResult.requiresMigration( + new NFASerializer<>( + new TypeDeserializerAdapter<>(elementCompatResult.getConvertDeserializer()), + new TypeDeserializerAdapter<>(sharedBufCompatResult.getConvertDeserializer()))); + } + } + } + + return CompatibilityResult.requiresMigration(null); + } + + private void serializeStates(Set<State<T>> states, DataOutputView out) throws IOException { + TypeSerializer<String> nameSerializer = StringSerializer.INSTANCE; + TypeSerializer<State.StateType> stateTypeSerializer = new EnumSerializer<>(State.StateType.class); + TypeSerializer<StateTransitionAction> actionSerializer = new EnumSerializer<>(StateTransitionAction.class); + + out.writeInt(states.size()); + for (State<T> state: states) { + nameSerializer.serialize(state.getName(), out); + stateTypeSerializer.serialize(state.getStateType(), out); + } + + for (State<T> state: states) { + nameSerializer.serialize(state.getName(), out); + + out.writeInt(state.getStateTransitions().size()); + for (StateTransition<T> transition : state.getStateTransitions()) { + nameSerializer.serialize(transition.getSourceState().getName(), out); + nameSerializer.serialize(transition.getTargetState().getName(), out); + actionSerializer.serialize(transition.getAction(), out); + + serializeCondition(transition.getCondition(), out); + } + } + } + + private Set<State<T>> deserializeStates(DataInputView in) throws IOException { + TypeSerializer<String> nameSerializer = StringSerializer.INSTANCE; + TypeSerializer<State.StateType> stateTypeSerializer = new EnumSerializer<>(State.StateType.class); + TypeSerializer<StateTransitionAction> actionSerializer = new EnumSerializer<>(StateTransitionAction.class); + + + final int noOfStates = in.readInt(); + Map<String, State<T>> states = new HashMap<>(noOfStates); + + for (int i = 0; i < noOfStates; i++) { + String stateName = nameSerializer.deserialize(in); + State.StateType stateType = stateTypeSerializer.deserialize(in); + + State<T> state = new State<>(stateName, stateType); + states.put(stateName, state); + } + + for (int i = 0; i < noOfStates; i++) { + String srcName = nameSerializer.deserialize(in); + + int noOfTransitions = in.readInt(); + for (int j = 0; j < noOfTransitions; j++) { + String src = nameSerializer.deserialize(in); + Preconditions.checkState(src.equals(srcName), + "Source Edge names do not match (" + srcName + " - " + src + ")."); + + String trgt = nameSerializer.deserialize(in); + StateTransitionAction action = actionSerializer.deserialize(in); + + IterativeCondition<T> condition = null; + try { + condition = deserializeCondition(in); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + + State<T> srcState = states.get(src); + State<T> trgtState = states.get(trgt); + srcState.addStateTransition(action, trgtState, condition); + } + + } + return new HashSet<>(states.values()); + } + + private void serializeCondition(IterativeCondition<T> condition, DataOutputView out) throws IOException { + out.writeBoolean(condition != null); + if (condition != null) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(condition); + + oos.close(); + baos.close(); + + byte[] serCondition = baos.toByteArray(); + out.writeInt(serCondition.length); + out.write(serCondition); + } + } + + private IterativeCondition<T> deserializeCondition(DataInputView in) throws IOException, ClassNotFoundException { + boolean hasCondition = in.readBoolean(); + if (hasCondition) { + int length = in.readInt(); + + byte[] serCondition = new byte[length]; + in.read(serCondition); + + ByteArrayInputStream bais = new ByteArrayInputStream(serCondition); + ObjectInputStream ois = new ObjectInputStream(bais); + + IterativeCondition<T> condition = (IterativeCondition<T>) ois.readObject(); + ois.close(); + bais.close(); + + return condition; + } + return null; + } + } + + ////////////////// Old Serialization ////////////////////// /** * A {@link TypeSerializer} for {@link NFA} that uses Java Serialization. @@ -862,10 +1250,7 @@ public class NFA<T> implements Serializable { @Override public void serialize(NFA<T> record, DataOutputView target) throws IOException { - try (ObjectOutputStream oos = new ObjectOutputStream(new DataOutputViewStream(target))) { - oos.writeObject(record); - oos.flush(); - } + throw new UnsupportedOperationException("This is the deprecated serialization strategy."); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index dcf5665..ab134d0 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -22,10 +22,20 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.cep.NonDuplicatingTypeSerializer; +import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -35,6 +45,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -63,6 +74,10 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { private static final long serialVersionUID = 9213251042562206495L; + /** + * @deprecated This serializer is only used for backwards compatibility. + */ + @Deprecated private final TypeSerializer<V> valueSerializer; private transient Map<K, SharedBufferPage<K, V>> pages; @@ -72,6 +87,12 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { this.pages = new HashMap<>(); } + public TypeSerializer<V> getValueSerializer() { + return (valueSerializer instanceof NonDuplicatingTypeSerializer) + ? ((NonDuplicatingTypeSerializer) valueSerializer).getTypeSerializer() + : valueSerializer; + } + /** * Stores given value (value + timestamp) under the given key. It assigns a preceding element * relation to the entry which is defined by the previous key, value (value + timestamp). @@ -293,155 +314,6 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { } } - private void writeObject(ObjectOutputStream oos) throws IOException { - DataOutputViewStreamWrapper target = new DataOutputViewStreamWrapper(oos); - Map<SharedBufferEntry<K, V>, Integer> entryIDs = new HashMap<>(); - int totalEdges = 0; - int entryCounter = 0; - - oos.defaultWriteObject(); - - // number of pages - oos.writeInt(pages.size()); - - for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: pages.entrySet()) { - SharedBufferPage<K, V> page = pageEntry.getValue(); - - // key for the current page - oos.writeObject(page.getKey()); - // number of page entries - oos.writeInt(page.entries.size()); - - for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) { - // serialize the sharedBufferEntry - SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue(); - - // assign id to the sharedBufferEntry for the future serialization of the previous - // relation - entryIDs.put(sharedBuffer, entryCounter++); - - ValueTimeWrapper<V> valueTimeWrapper = sharedBuffer.getValueTime(); - - valueSerializer.serialize(valueTimeWrapper.value, target); - oos.writeLong(valueTimeWrapper.getTimestamp()); - oos.writeInt(valueTimeWrapper.getCounter()); - - int edges = sharedBuffer.edges.size(); - totalEdges += edges; - - oos.writeInt(sharedBuffer.referenceCounter); - } - } - - // write the edges between the shared buffer entries - oos.writeInt(totalEdges); - - for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: pages.entrySet()) { - SharedBufferPage<K, V> page = pageEntry.getValue(); - - for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) { - SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue(); - - if (!entryIDs.containsKey(sharedBuffer)) { - throw new RuntimeException("Could not find id for entry: " + sharedBuffer); - } else { - int id = entryIDs.get(sharedBuffer); - - for (SharedBufferEdge<K, V> edge: sharedBuffer.edges) { - // in order to serialize the previous relation we simply serialize the ids - // of the source and target SharedBufferEntry - if (edge.target != null) { - if (!entryIDs.containsKey(edge.getTarget())) { - throw new RuntimeException("Could not find id for entry: " + edge.getTarget()); - } else { - int targetId = entryIDs.get(edge.getTarget()); - - oos.writeInt(id); - oos.writeInt(targetId); - oos.writeObject(edge.version); - } - } else { - oos.writeInt(id); - oos.writeInt(-1); - oos.writeObject(edge.version); - } - } - } - } - } - } - - private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { - DataInputViewStreamWrapper source = new DataInputViewStreamWrapper(ois); - ArrayList<SharedBufferEntry<K, V>> entryList = new ArrayList<>(); - ois.defaultReadObject(); - - this.pages = new HashMap<>(); - - int numberPages = ois.readInt(); - - for (int i = 0; i < numberPages; i++) { - // key of the page - @SuppressWarnings("unchecked") - K key = (K)ois.readObject(); - - SharedBufferPage<K, V> page = new SharedBufferPage<>(key); - - pages.put(key, page); - - int numberEntries = ois.readInt(); - - for (int j = 0; j < numberEntries; j++) { - // restore the SharedBufferEntries for the given page - V value = valueSerializer.deserialize(source); - long timestamp = ois.readLong(); - int counter = ois.readInt(); - - ValueTimeWrapper<V> valueTimeWrapper = new ValueTimeWrapper<>(value, timestamp, counter); - SharedBufferEntry<K, V> sharedBufferEntry = new SharedBufferEntry<K, V>(valueTimeWrapper, page); - - sharedBufferEntry.referenceCounter = ois.readInt(); - - page.entries.put(valueTimeWrapper, sharedBufferEntry); - - entryList.add(sharedBufferEntry); - } - } - - // read the edges of the shared buffer entries - int numberEdges = ois.readInt(); - - for (int j = 0; j < numberEdges; j++) { - int sourceIndex = ois.readInt(); - int targetIndex = ois.readInt(); - - if (sourceIndex >= entryList.size() || sourceIndex < 0) { - throw new RuntimeException("Could not find source entry with index " + sourceIndex + - ". This indicates a corrupted state."); - } else { - // We've already deserialized the shared buffer entry. Simply read its ID and - // retrieve the buffer entry from the list of entries - SharedBufferEntry<K, V> sourceEntry = entryList.get(sourceIndex); - - final DeweyNumber version = (DeweyNumber) ois.readObject(); - final SharedBufferEntry<K, V> target; - - if (targetIndex >= 0) { - if (targetIndex >= entryList.size()) { - throw new RuntimeException("Could not find target entry with index " + targetIndex + - ". This indicates a corrupted state."); - } else { - target = entryList.get(targetIndex); - } - } else { - target = null; - } - - sourceEntry.edges.add(new SharedBufferEdge<K, V>(target, version)); - } - } - } - private SharedBuffer( TypeSerializer<V> valueSerializer, Map<K, SharedBufferPage<K, V>> pages) { @@ -523,7 +395,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { public String toString() { StringBuilder builder = new StringBuilder(); - for(Map.Entry<K, SharedBufferPage<K, V>> entry :pages.entrySet()){ + for(Map.Entry<K, SharedBufferPage<K, V>> entry: pages.entrySet()){ builder.append("Key: ").append(entry.getKey()).append("\n"); builder.append("Value: ").append(entry.getValue()).append("\n"); } @@ -537,7 +409,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { @SuppressWarnings("unchecked") SharedBuffer<K, V> other = (SharedBuffer<K, V>) obj; - return pages.equals(other.pages) && valueSerializer.equals(other.valueSerializer); + return pages.equals(other.pages) && getValueSerializer().equals(other.getValueSerializer()); } else { return false; } @@ -545,7 +417,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { @Override public int hashCode() { - return Objects.hash(pages, valueSerializer); + return Objects.hash(pages, getValueSerializer()); } /** @@ -931,4 +803,424 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { return "ExtractionState(" + entry + ", " + version + ", [" + StringUtils.join(path, ", ") + "])"; } } + + ////////////// New Serialization //////////////////// + + /** + * The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state. + */ + public static final class SharedBufferSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + /** This empty constructor is required for deserializing the configuration. */ + public SharedBufferSerializerConfigSnapshot() {} + + public SharedBufferSerializerConfigSnapshot( + TypeSerializerConfigSnapshot keySerializerConfigSnapshot, + TypeSerializerConfigSnapshot valueSerializerConfigSnapshot, + TypeSerializerConfigSnapshot versionSerializerConfigSnapshot) { + + super(keySerializerConfigSnapshot, valueSerializerConfigSnapshot, versionSerializerConfigSnapshot); + } + + @Override + public int getVersion() { + return VERSION; + } + } + + /** + * A {@link TypeSerializer} for the {@link SharedBuffer}. + */ + public static class SharedBufferSerializer<K extends Serializable, V> extends TypeSerializer<SharedBuffer<K, V>> { + + private static final long serialVersionUID = -3254176794680331560L; + + private final TypeSerializer<K> keySerializer; + private final TypeSerializer<V> valueSerializer; + private final TypeSerializer<DeweyNumber> versionSerializer; + + public SharedBufferSerializer( + TypeSerializer<K> keySerializer, + TypeSerializer<V> valueSerializer) { + this(keySerializer, valueSerializer, new DeweyNumber.DeweyNumberSerializer()); + } + + public SharedBufferSerializer( + TypeSerializer<K> keySerializer, + TypeSerializer<V> valueSerializer, + TypeSerializer<DeweyNumber> versionSerializer) { + + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; + this.versionSerializer = versionSerializer; + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer<SharedBuffer<K, V>> duplicate() { + return new SharedBufferSerializer<>(keySerializer, valueSerializer); + } + + @Override + public SharedBuffer<K, V> createInstance() { + return new SharedBuffer<>(new NonDuplicatingTypeSerializer<V>(valueSerializer)); + } + + @Override + public SharedBuffer<K, V> copy(SharedBuffer from) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + serialize(from, new DataOutputViewStreamWrapper(oos)); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bais); + + @SuppressWarnings("unchecked") + SharedBuffer<K, V> copy = deserialize(new DataInputViewStreamWrapper(ois)); + ois.close(); + bais.close(); + + return copy; + } catch (IOException e) { + throw new RuntimeException("Could not copy SharredBuffer.", e); + } + } + + @Override + public SharedBuffer<K, V> copy(SharedBuffer from, SharedBuffer reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(SharedBuffer record, DataOutputView target) throws IOException { + Map<K, SharedBufferPage<K, V>> pages = record.pages; + Map<SharedBufferEntry<K, V>, Integer> entryIDs = new HashMap<>(); + + int totalEdges = 0; + int entryCounter = 0; + + // number of pages + target.writeInt(pages.size()); + + for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: pages.entrySet()) { + SharedBufferPage<K, V> page = pageEntry.getValue(); + + // key for the current page + keySerializer.serialize(page.getKey(), target); + + // number of page entries + target.writeInt(page.entries.size()); + + for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) { + SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue(); + + // assign id to the sharedBufferEntry for the future + // serialization of the previous relation + entryIDs.put(sharedBuffer, entryCounter++); + + ValueTimeWrapper<V> valueTimeWrapper = sharedBuffer.getValueTime(); + + valueSerializer.serialize(valueTimeWrapper.value, target); + target.writeLong(valueTimeWrapper.getTimestamp()); + target.writeInt(valueTimeWrapper.getCounter()); + + int edges = sharedBuffer.edges.size(); + totalEdges += edges; + + target.writeInt(sharedBuffer.referenceCounter); + } + } + + // write the edges between the shared buffer entries + target.writeInt(totalEdges); + + for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: pages.entrySet()) { + SharedBufferPage<K, V> page = pageEntry.getValue(); + + for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) { + SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue(); + + Integer id = entryIDs.get(sharedBuffer); + Preconditions.checkState(id != null, "Could not find id for entry: " + sharedBuffer); + + for (SharedBufferEdge<K, V> edge: sharedBuffer.edges) { + // in order to serialize the previous relation we simply serialize the ids + // of the source and target SharedBufferEntry + if (edge.target != null) { + Integer targetId = entryIDs.get(edge.getTarget()); + Preconditions.checkState(targetId != null, + "Could not find id for entry: " + edge.getTarget()); + + target.writeInt(id); + target.writeInt(targetId); + versionSerializer.serialize(edge.version, target); + } else { + target.writeInt(id); + target.writeInt(-1); + versionSerializer.serialize(edge.version, target); + } + } + } + } + } + + @Override + public SharedBuffer deserialize(DataInputView source) throws IOException { + List<SharedBufferEntry<K, V>> entryList = new ArrayList<>(); + Map<K, SharedBufferPage<K, V>> pages = new HashMap<>(); + + int totalPages = source.readInt(); + + for (int i = 0; i < totalPages; i++) { + // key of the page + @SuppressWarnings("unchecked") + K key = keySerializer.deserialize(source); + + SharedBufferPage<K, V> page = new SharedBufferPage<>(key); + + pages.put(key, page); + + int numberEntries = source.readInt(); + + for (int j = 0; j < numberEntries; j++) { + // restore the SharedBufferEntries for the given page + V value = valueSerializer.deserialize(source); + long timestamp = source.readLong(); + int counter = source.readInt(); + + ValueTimeWrapper<V> valueTimeWrapper = new ValueTimeWrapper<>(value, timestamp, counter); + SharedBufferEntry<K, V> sharedBufferEntry = new SharedBufferEntry<K, V>(valueTimeWrapper, page); + + sharedBufferEntry.referenceCounter = source.readInt(); + + page.entries.put(valueTimeWrapper, sharedBufferEntry); + + entryList.add(sharedBufferEntry); + } + } + + // read the edges of the shared buffer entries + int totalEdges = source.readInt(); + + for (int j = 0; j < totalEdges; j++) { + int sourceIndex = source.readInt(); + Preconditions.checkState(sourceIndex < entryList.size() && sourceIndex >= 0, + "Could not find source entry with index " + sourceIndex + ". This indicates a corrupted state."); + + int targetIndex = source.readInt(); + Preconditions.checkState(targetIndex < entryList.size(), + "Could not find target entry with index " + sourceIndex + ". This indicates a corrupted state."); + + DeweyNumber version = versionSerializer.deserialize(source); + + // We've already deserialized the shared buffer entry. Simply read its ID and + // retrieve the buffer entry from the list of entries + SharedBufferEntry<K, V> sourceEntry = entryList.get(sourceIndex); + SharedBufferEntry<K, V> targetEntry = targetIndex < 0 ? null : entryList.get(targetIndex); + + sourceEntry.edges.add(new SharedBufferEdge<>(targetEntry, version)); + } + // here we put the old NonDuplicating serializer because this needs to create a copy + // of the buffer, as created by the NFA. There, for compatibility reasons, we have left + // the old serializer. + return new SharedBuffer(new NonDuplicatingTypeSerializer(valueSerializer), pages); + } + + @Override + public SharedBuffer deserialize(SharedBuffer reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + int numberPages = source.readInt(); + target.writeInt(numberPages); + + for (int i = 0; i < numberPages; i++) { + // key of the page + @SuppressWarnings("unchecked") + K key = keySerializer.deserialize(source); + keySerializer.serialize(key, target); + + int numberEntries = source.readInt(); + + for (int j = 0; j < numberEntries; j++) { + // restore the SharedBufferEntries for the given page + V value = valueSerializer.deserialize(source); + valueSerializer.serialize(value, target); + + long timestamp = source.readLong(); + target.writeLong(timestamp); + + int counter = source.readInt(); + target.writeInt(counter); + + int referenceCounter = source.readInt(); + target.writeInt(referenceCounter); + } + } + + // read the edges of the shared buffer entries + int numberEdges = source.readInt(); + target.writeInt(numberEdges); + + for (int j = 0; j < numberEdges; j++) { + int sourceIndex = source.readInt(); + int targetIndex = source.readInt(); + + target.writeInt(sourceIndex); + target.writeInt(targetIndex); + + DeweyNumber version = versionSerializer.deserialize(source); + versionSerializer.serialize(version, target); + } + } + + @Override + public boolean equals(Object obj) { + return obj == this || + (obj != null && obj.getClass().equals(getClass()) && + keySerializer.equals(((SharedBufferSerializer<?, ?>) obj).keySerializer) && + valueSerializer.equals(((SharedBufferSerializer<?, ?>) obj).valueSerializer) && + versionSerializer.equals(((SharedBufferSerializer<?, ?>) obj).versionSerializer)); + } + + @Override + public boolean canEqual(Object obj) { + return true; + } + + @Override + public int hashCode() { + return 37 * keySerializer.hashCode() + valueSerializer.hashCode(); + } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + return new SharedBufferSerializerConfigSnapshot( + keySerializer.snapshotConfiguration(), + valueSerializer.snapshotConfiguration(), + versionSerializer.snapshotConfiguration() + ); + } + + @Override + public CompatibilityResult<SharedBuffer<K, V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof SharedBufferSerializerConfigSnapshot) { + TypeSerializerConfigSnapshot[] serializerConfigSnapshots = + ((SharedBufferSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots(); + + CompatibilityResult<K> keyCompatResult = keySerializer.ensureCompatibility(serializerConfigSnapshots[0]); + CompatibilityResult<V> valueCompatResult = valueSerializer.ensureCompatibility(serializerConfigSnapshots[1]); + CompatibilityResult<DeweyNumber> versionCompatResult = versionSerializer.ensureCompatibility(serializerConfigSnapshots[2]); + + if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration() && !versionCompatResult.isRequiresMigration()) { + return CompatibilityResult.compatible(); + } else { + if (keyCompatResult.getConvertDeserializer() != null + && valueCompatResult.getConvertDeserializer() != null + && versionCompatResult.getConvertDeserializer() != null) { + return CompatibilityResult.requiresMigration( + new SharedBufferSerializer<>( + new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()), + new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer()), + new TypeDeserializerAdapter<>(versionCompatResult.getConvertDeserializer()) + )); + } + } + } + + return CompatibilityResult.requiresMigration(null); + } + } + + ////////////////// Java Serialization methods for backwards compatibility ////////////////// + + private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { + DataInputViewStreamWrapper source = new DataInputViewStreamWrapper(ois); + ArrayList<SharedBufferEntry<K, V>> entryList = new ArrayList<>(); + ois.defaultReadObject(); + + this.pages = new HashMap<>(); + + int numberPages = ois.readInt(); + + for (int i = 0; i < numberPages; i++) { + // key of the page + @SuppressWarnings("unchecked") + K key = (K)ois.readObject(); + + SharedBufferPage<K, V> page = new SharedBufferPage<>(key); + + pages.put(key, page); + + int numberEntries = ois.readInt(); + + for (int j = 0; j < numberEntries; j++) { + // restore the SharedBufferEntries for the given page + V value = valueSerializer.deserialize(source); + long timestamp = ois.readLong(); + + ValueTimeWrapper<V> valueTimeWrapper = new ValueTimeWrapper<>(value, timestamp, 0); + SharedBufferEntry<K, V> sharedBufferEntry = new SharedBufferEntry<K, V>(valueTimeWrapper, page); + + sharedBufferEntry.referenceCounter = ois.readInt(); + + page.entries.put(valueTimeWrapper, sharedBufferEntry); + + entryList.add(sharedBufferEntry); + } + } + + // read the edges of the shared buffer entries + int numberEdges = ois.readInt(); + + for (int j = 0; j < numberEdges; j++) { + int sourceIndex = ois.readInt(); + int targetIndex = ois.readInt(); + + if (sourceIndex >= entryList.size() || sourceIndex < 0) { + throw new RuntimeException("Could not find source entry with index " + sourceIndex + + ". This indicates a corrupted state."); + } else { + // We've already deserialized the shared buffer entry. Simply read its ID and + // retrieve the buffer entry from the list of entries + SharedBufferEntry<K, V> sourceEntry = entryList.get(sourceIndex); + + final DeweyNumber version = (DeweyNumber) ois.readObject(); + final SharedBufferEntry<K, V> target; + + if (targetIndex >= 0) { + if (targetIndex >= entryList.size()) { + throw new RuntimeException("Could not find target entry with index " + targetIndex + + ". This indicates a corrupted state."); + } else { + target = entryList.get(targetIndex); + } + } else { + target = null; + } + + sourceEntry.edges.add(new SharedBufferEdge<K, V>(target, version)); + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java index 3d11538..14395b1 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java @@ -146,6 +146,8 @@ public class State<T> implements Serializable { Stop } + //////////////// Backwards Compatibility //////////////////// + private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { ois.defaultReadObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index 39c18b9..1b31485 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -44,6 +44,7 @@ import org.apache.flink.cep.pattern.conditions.BooleanConditions; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.cep.pattern.conditions.NotCondition; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Preconditions; /** * Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a @@ -53,6 +54,8 @@ public class NFACompiler { protected static final String ENDING_STATE_NAME = "$endState$"; + protected static final String STATE_NAME_DELIM = ":"; + /** * Compiles the given pattern into a {@link NFA}. * @@ -71,6 +74,11 @@ public class NFACompiler { return factory.createNFA(); } + public static String getOriginalStateNameFromInternal(String internalName) { + Preconditions.checkNotNull(internalName); + return internalName.split(STATE_NAME_DELIM)[0]; + } + /** * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create * multiple NFAs. @@ -178,10 +186,7 @@ public class NFACompiler { * @return dummy Final state */ private State<T> createEndingState() { - checkPatternNameUniqueness(ENDING_STATE_NAME); - State<T> endState = new State<>(ENDING_STATE_NAME, State.StateType.Final); - states.add(endState); - + State<T> endState = createState(ENDING_STATE_NAME, State.StateType.Final); windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() : 0L; return endState; } @@ -199,7 +204,8 @@ public class NFACompiler { if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) { //skip notFollow patterns, they are converted into edge conditions } else if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_NEXT) { - final State<T> notNext = createNormalState(); + checkPatternNameUniqueness(currentPattern.getName()); + final State<T> notNext = createState(currentPattern.getName(), State.StateType.Normal); final IterativeCondition<T> notCondition = (IterativeCondition<T>) currentPattern.getCondition(); final State<T> stopState = createStopState(notCondition, currentPattern.getName()); @@ -212,6 +218,7 @@ public class NFACompiler { notNext.addProceed(stopState, notCondition); lastSink = notNext; } else { + checkPatternNameUniqueness(currentPattern.getName()); lastSink = convertPattern(lastSink); } @@ -236,6 +243,7 @@ public class NFACompiler { */ @SuppressWarnings("unchecked") private State<T> createStartState(State<T> sinkState) { + checkPatternNameUniqueness(currentPattern.getName()); final State<T> beginningState = convertPattern(sinkState); beginningState.makeStart(); return beginningState; @@ -243,7 +251,6 @@ public class NFACompiler { private State<T> convertPattern(final State<T> sinkState) { final State<T> lastSink; - checkPatternNameUniqueness(currentPattern.getName()); final Quantifier quantifier = currentPattern.getQuantifier(); if (quantifier.hasProperty(Quantifier.QuantifierProperty.LOOPING)) { @@ -273,18 +280,42 @@ public class NFACompiler { * * @return the created state */ - private State<T> createNormalState() { - final State<T> state = new State<>(currentPattern.getName(), State.StateType.Normal); + private State<T> createState(String name, State.StateType stateType) { + String stateName = getUniqueInternalStateName(name); + usedNames.add(stateName); + State<T> state = new State<>(stateName, stateType); states.add(state); return state; } + /** + * Used to give a unique name to states created + * during the translation process. + * + * @param baseName The base of the name. + */ + private String getUniqueInternalStateName(String baseName) { + int counter = 0; + String candidate = baseName; + while (usedNames.contains(candidate)) { + candidate = baseName + STATE_NAME_DELIM + counter++; + } + return candidate; + } + + private void checkPatternNameUniqueness(String patternName) { + if (usedNames.contains(patternName)) { + throw new MalformedPatternException( + "Duplicate pattern name: " + patternName + ". " + + "Pattern names must be unique."); + } + } + private State<T> createStopState(final IterativeCondition<T> notCondition, final String name) { // We should not duplicate the notStates. All states from which we can stop should point to the same one. State<T> stopState = stopStates.get(name); if (stopState == null) { - stopState = new State<>(name, State.StateType.Stop); - states.add(stopState); + stopState = createState(name, State.StateType.Stop); stopState.addTake(notCondition); stopStates.put(name, stopState); } @@ -313,8 +344,7 @@ public class NFACompiler { return sinkState; } - final State<T> copyOfSink = new State<>(sinkState.getName(), sinkState.getStateType()); - states.add(copyOfSink); + final State<T> copyOfSink = createState(sinkState.getName(), sinkState.getStateType()); for (StateTransition<T> tStateTransition : sinkState.getStateTransitions()) { @@ -364,15 +394,6 @@ public class NFACompiler { } } - private void checkPatternNameUniqueness(String patternName) { - if (usedNames.contains(currentPattern.getName())) { - throw new MalformedPatternException( - "Duplicate pattern name: " + patternName + ". " + - "Pattern names must be unique."); - } - usedNames.add(patternName); - } - /** * Creates a "complex" state consisting of given number of states with * same {@link IterativeCondition} @@ -396,12 +417,12 @@ public class NFACompiler { return createSingletonState(lastSink, ignoreCondition, false); } - final State<T> singletonState = createNormalState(); + final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal); singletonState.addTake(lastSink, currentCondition); singletonState.addProceed(sinkState, BooleanConditions.<T>trueFunction()); if (ignoreCondition != null) { - State<T> ignoreState = createNormalState(); + State<T> ignoreState = createState(currentPattern.getName(), State.StateType.Normal); ignoreState.addTake(lastSink, currentCondition); ignoreState.addIgnore(ignoreCondition); singletonState.addIgnore(ignoreState, ignoreCondition); @@ -440,7 +461,7 @@ public class NFACompiler { final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition(); final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction(); - final State<T> singletonState = createNormalState(); + final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal); // if event is accepted then all notPatterns previous to the optional states are no longer valid final State<T> sink = copyWithoutTransitiveNots(sinkState); singletonState.addTake(sink, currentCondition); @@ -453,7 +474,7 @@ public class NFACompiler { if (ignoreCondition != null) { final State<T> ignoreState; if (isOptional) { - ignoreState = createNormalState(); + ignoreState = createState(currentPattern.getName(), State.StateType.Normal); ignoreState.addTake(sink, currentCondition); ignoreState.addIgnore(ignoreCondition); addStopStates(ignoreState); @@ -479,14 +500,14 @@ public class NFACompiler { final IterativeCondition<T> ignoreCondition = getInnerIgnoreCondition(currentPattern); final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction(); - final State<T> loopingState = createNormalState(); + final State<T> loopingState = createState(currentPattern.getName(), State.StateType.Normal); loopingState.addProceed(sinkState, trueFunction); loopingState.addTake(currentCondition); addStopStateToLooping(loopingState); if (ignoreCondition != null) { - final State<T> ignoreState = createNormalState(); + final State<T> ignoreState = createState(currentPattern.getName(), State.StateType.Normal); ignoreState.addTake(loopingState, currentCondition); ignoreState.addIgnore(ignoreCondition); loopingState.addIgnore(ignoreState, ignoreCondition); @@ -507,7 +528,7 @@ public class NFACompiler { private State<T> createInitMandatoryStateOfOneOrMore(final State<T> sinkState) { final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition(); - final State<T> firstState = createNormalState(); + final State<T> firstState = createState(currentPattern.getName(), State.StateType.Normal); firstState.addTake(sinkState, currentCondition); final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern); @@ -528,13 +549,13 @@ public class NFACompiler { private State<T> createInitOptionalStateOfZeroOrMore(final State<T> loopingState, final State<T> lastSink) { final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition(); - final State<T> firstState = createNormalState(); + final State<T> firstState = createState(currentPattern.getName(), State.StateType.Normal); firstState.addProceed(lastSink, BooleanConditions.<T>trueFunction()); firstState.addTake(loopingState, currentCondition); final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern); if (ignoreFunction != null) { - final State<T> firstStateWithoutProceed = createNormalState(); + final State<T> firstStateWithoutProceed = createState(currentPattern.getName(), State.StateType.Normal); firstState.addIgnore(firstStateWithoutProceed, ignoreFunction); firstStateWithoutProceed.addIgnore(ignoreFunction); firstStateWithoutProceed.addTake(loopingState, currentCondition); http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index bac21b3..2ed7245 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -82,7 +82,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> /////////////// State ////////////// - private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorState"; + private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorStateName"; private static final String PRIORITY_QUEUE_STATE_NAME = "priorityQueueStateName"; private transient ValueState<NFA<IN>> nfaOperatorState; @@ -127,8 +127,8 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> if (nfaOperatorState == null) { nfaOperatorState = getRuntimeContext().getState( new ValueStateDescriptor<>( - NFA_OPERATOR_STATE_NAME, - new NFA.Serializer<IN>())); + NFA_OPERATOR_STATE_NAME, + new NFA.NFASerializer<>(inputSerializer))); } @SuppressWarnings("unchecked,rawtypes") @@ -311,12 +311,20 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> VoidNamespaceSerializer.INSTANCE, this); + // this is with the old serializer so that we can read the state. + ValueState<NFA<IN>> oldNfaOperatorState = getRuntimeContext().getState( + new ValueStateDescriptor<>("nfaOperatorState", new NFA.Serializer<IN>())); + if (migratingFromOldKeyedOperator) { int numberEntries = inputView.readInt(); - for (int i = 0; i <numberEntries; i++) { + for (int i = 0; i < numberEntries; i++) { KEY key = keySerializer.deserialize(inputView); setCurrentKey(key); saveRegisterWatermarkTimer(); + + NFA<IN> nfa = oldNfaOperatorState.value(); + oldNfaOperatorState.clear(); + nfaOperatorState.update(nfa); } } else { http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java index 11d193a..2619764 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java @@ -20,16 +20,19 @@ package org.apache.flink.cep.nfa; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.BooleanConditions; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -185,35 +188,136 @@ public class NFATest extends TestLogger { @Test public void testNFASerialization() throws IOException, ClassNotFoundException { - NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0, false); - - State<Event> startingState = new State<>("", State.StateType.Start); - State<Event> startState = new State<>("start", State.StateType.Normal); - State<Event> endState = new State<>("end", State.StateType.Final); - - - startingState.addTake( - new NameFilter("start")); - startState.addTake( - new NameFilter("end")); - startState.addIgnore(null); - - nfa.addState(startingState); - nfa.addState(startState); - nfa.addState(endState); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - - oos.writeObject(nfa); - - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - ObjectInputStream ois = new ObjectInputStream(bais); - - @SuppressWarnings("unchecked") - NFA<Event> copy = (NFA<Event>) ois.readObject(); - - assertEquals(nfa, copy); + Pattern<Event, ?> pattern1 = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 1858562682635302605L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedByAny("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).oneOrMore().optional().allowCombinations().followedByAny("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + Pattern<Event, ?> pattern2 = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 1858562682635302605L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).notFollowedBy("not").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = -6085237016591726715L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).oneOrMore().optional().allowCombinations().followedByAny("end").where(new IterativeCondition<Event>() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context<Event> ctx) throws Exception { + double sum = 0.0; + for (Event e : ctx.getEventsForPattern("middle")) { + sum += e.getPrice(); + } + return sum > 5.0; + } + }); + + Pattern<Event, ?> pattern3 = Pattern.<Event>begin("start") + .notFollowedBy("not").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = -6085237016591726715L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).oneOrMore().allowCombinations().followedByAny("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + List<Pattern<Event, ?>> patterns = new ArrayList<>(); + patterns.add(pattern1); + patterns.add(pattern2); + patterns.add(pattern3); + + for (Pattern<Event, ?> p: patterns) { + NFACompiler.NFAFactory<Event> nfaFactory = NFACompiler.compileFactory(p, Event.createTypeSerializer(), false); + NFA<Event> nfa = nfaFactory.createNFA(); + + Event a = new Event(40, "a", 1.0); + Event b = new Event(41, "b", 2.0); + Event c = new Event(42, "c", 3.0); + Event b1 = new Event(41, "b", 3.0); + Event b2 = new Event(41, "b", 4.0); + Event b3 = new Event(41, "b", 5.0); + Event d = new Event(43, "d", 4.0); + + nfa.process(a, 1); + nfa.process(b, 2); + nfa.process(c, 3); + nfa.process(b1, 4); + nfa.process(b2, 5); + nfa.process(b3, 6); + nfa.process(d, 7); + nfa.process(a, 8); + + NFA.NFASerializer<Event> serializer = new NFA.NFASerializer<>(Event.createTypeSerializer()); + + //serialize + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializer.serialize(nfa, new DataOutputViewStreamWrapper(baos)); + baos.close(); + + // copy + NFA.NFASerializer<Event> copySerializer = new NFA.NFASerializer<>(Event.createTypeSerializer()); + ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray()); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + copySerializer.copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out)); + in.close(); + out.close(); + + // deserialize + ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray()); + NFA.NFASerializer<Event> deserializer = new NFA.NFASerializer<>(Event.createTypeSerializer()); + NFA<Event> copy = deserializer.deserialize(new DataInputViewStreamWrapper(bais)); + bais.close(); + + assertEquals(nfa, copy); + } } private NFA<Event> createStartEndNFA(long windowLength) { @@ -251,20 +355,4 @@ public class NFATest extends TestLogger { return nfa; } - - private static class NameFilter extends SimpleCondition<Event> { - - private static final long serialVersionUID = 7472112494752423802L; - - private final String name; - - public NameFilter(final String name) { - this.name = name; - } - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals(name); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java index ee94b6f..bd828b6 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java @@ -20,7 +20,10 @@ package org.apache.flink.cep.nfa; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.cep.Event; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -130,15 +133,14 @@ public class SharedBufferTest extends TestLogger { sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, 5, DeweyNumber.fromString("1.1")); sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, 6, DeweyNumber.fromString("1.1.0")); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); + SharedBuffer.SharedBufferSerializer serializer = new SharedBuffer.SharedBufferSerializer( + StringSerializer.INSTANCE, Event.createTypeSerializer()); - oos.writeObject(sharedBuffer); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializer.serialize(sharedBuffer, new DataOutputViewStreamWrapper(baos)); ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - ObjectInputStream ois = new ObjectInputStream(bais); - - SharedBuffer<String, Event> copy = (SharedBuffer<String, Event>)ois.readObject(); + SharedBuffer<String, Event> copy = serializer.deserialize(new DataInputViewStreamWrapper(bais)); assertEquals(sharedBuffer, copy); } http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java index d9efb1b..fb05901 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java @@ -103,7 +103,6 @@ public class CEPFrom12MigrationTest { } @Test - @Ignore public void testRestoreAfterBranchingPattern() throws Exception { KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { @@ -171,6 +170,54 @@ public class CEPFrom12MigrationTest { assertEquals(middleEvent2, patternMap2.get("middle").get(0)); assertEquals(endEvent, patternMap2.get("end").get(0)); + // and now go for a checkpoint with the new serializers + + final Event startEvent1 = new Event(42, "start", 2.0); + final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0); + final Event endEvent1 = new Event(42, "end", 2.0); + + harness.processElement(new StreamRecord<Event>(startEvent1, 21)); + harness.processElement(new StreamRecord<Event>(middleEvent3, 23)); + + // simulate snapshot/restore with some elements in internal sorting queue + OperatorStateHandles snapshot = harness.snapshot(1L, 1L); + harness.close(); + + harness = new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + IntSerializer.INSTANCE, + new NFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + harness.initializeState(snapshot); + harness.open(); + + harness.processElement(new StreamRecord<>(endEvent1, 25)); + + harness.processWatermark(new Watermark(50)); + + result = harness.getOutput(); + + // watermark and the result + assertEquals(2, result.size()); + + Object resultObject3 = result.poll(); + assertTrue(resultObject3 instanceof StreamRecord); + StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3; + assertTrue(resultRecord3.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, List<Event>> patternMap3 = (Map<String, List<Event>>) resultRecord3.getValue(); + + assertEquals(startEvent1, patternMap3.get("start").get(0)); + assertEquals(middleEvent3, patternMap3.get("middle").get(0)); + assertEquals(endEvent1, patternMap3.get("end").get(0)); + harness.close(); } @@ -220,7 +267,6 @@ public class CEPFrom12MigrationTest { } @Test - @Ignore public void testRestoreStartingNewPatternAfterMigration() throws Exception { KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { @@ -302,6 +348,54 @@ public class CEPFrom12MigrationTest { assertEquals(middleEvent2, patternMap3.get("middle").get(0)); assertEquals(endEvent, patternMap3.get("end").get(0)); + // and now go for a checkpoint with the new serializers + + final Event startEvent3 = new Event(42, "start", 2.0); + final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0); + final Event endEvent1 = new Event(42, "end", 2.0); + + harness.processElement(new StreamRecord<Event>(startEvent3, 21)); + harness.processElement(new StreamRecord<Event>(middleEvent3, 23)); + + // simulate snapshot/restore with some elements in internal sorting queue + OperatorStateHandles snapshot = harness.snapshot(1L, 1L); + harness.close(); + + harness = new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + IntSerializer.INSTANCE, + new NFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + harness.initializeState(snapshot); + harness.open(); + + harness.processElement(new StreamRecord<>(endEvent1, 25)); + + harness.processWatermark(new Watermark(50)); + + result = harness.getOutput(); + + // watermark and the result + assertEquals(2, result.size()); + + Object resultObject4 = result.poll(); + assertTrue(resultObject4 instanceof StreamRecord); + StreamRecord<?> resultRecord4 = (StreamRecord<?>) resultObject4; + assertTrue(resultRecord4.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, List<Event>> patternMap4 = (Map<String, List<Event>>) resultRecord4.getValue(); + + assertEquals(startEvent3, patternMap4.get("start").get(0)); + assertEquals(middleEvent3, patternMap4.get("middle").get(0)); + assertEquals(endEvent1, patternMap4.get("end").get(0)); + harness.close(); } @@ -347,7 +441,6 @@ public class CEPFrom12MigrationTest { @Test - @Ignore public void testSinglePatternAfterMigration() throws Exception { KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {