Repository: flink Updated Branches: refs/heads/master 57333c622 -> ff9cefb36
[FLINK-7835][cep] Fix duplicate() in NFASerializer. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff9cefb3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff9cefb3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff9cefb3 Branch: refs/heads/master Commit: ff9cefb36c70a9b6c55f607fc2b56644c57f7057 Parents: 57333c6 Author: kkloudas <kklou...@gmail.com> Authored: Thu Oct 12 15:20:32 2017 +0200 Committer: kkloudas <kklou...@gmail.com> Committed: Fri Oct 13 13:49:58 2017 +0200 ---------------------------------------------------------------------- .../flink/cep/NonDuplicatingTypeSerializer.java | 5 +- .../main/java/org/apache/flink/cep/nfa/NFA.java | 154 +------------------ .../org/apache/flink/cep/nfa/SharedBuffer.java | 62 ++++---- .../java/org/apache/flink/cep/nfa/NFATest.java | 5 +- .../apache/flink/cep/nfa/SharedBufferTest.java | 2 +- .../flink/cep/operator/CEPOperatorTest.java | 11 ++ 6 files changed, 57 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java index f9e13fe..0978aa1 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.util.ArrayList; import java.util.IdentityHashMap; +import java.util.List; /** * Type serializer which keeps track of the serialized objects so that each object is only @@ -53,7 +54,7 @@ public final class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> { private transient IdentityHashMap<T, Integer> identityMap; // here we store the already deserialized objects - private transient ArrayList<T> elementList; + private transient List<T> elementList; public NonDuplicatingTypeSerializer(final TypeSerializer<T> typeSerializer) { this.typeSerializer = typeSerializer; @@ -82,7 +83,7 @@ public final class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> { @Override public TypeSerializer<T> duplicate() { - return new NonDuplicatingTypeSerializer<>(typeSerializer); + return new NonDuplicatingTypeSerializer<>(typeSerializer.duplicate()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index ff4967f..7092d73 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -28,9 +28,7 @@ import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.common.typeutils.base.EnumSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; import org.apache.flink.cep.NonDuplicatingTypeSerializer; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler; @@ -48,7 +46,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.io.OptionalDataException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -489,7 +486,6 @@ public class NFA<T> implements Serializable { } } - /** * Computes the next computation states based on the given computation state, the current event, * its timestamp and the internal state machine. The algorithm is: @@ -793,53 +789,6 @@ public class NFA<T> implements Serializable { return result; } - ////////////////////// Fault-Tolerance ////////////////////// - - private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { - ois.defaultReadObject(); - - int numberComputationStates = ois.readInt(); - - computationStates = new LinkedList<>(); - - final List<ComputationState<T>> readComputationStates = new ArrayList<>(numberComputationStates); - - for (int i = 0; i < numberComputationStates; i++) { - ComputationState<T> computationState = readComputationState(ois); - readComputationStates.add(computationState); - } - - this.computationStates.addAll(readComputationStates); - nonDuplicatingTypeSerializer.clearReferences(); - } - - @SuppressWarnings("unchecked") - private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException { - final State<T> state = (State<T>) ois.readObject(); - State<T> previousState; - try { - previousState = (State<T>) ois.readObject(); - } catch (OptionalDataException e) { - previousState = null; - } - - final long timestamp = ois.readLong(); - final DeweyNumber version = (DeweyNumber) ois.readObject(); - final long startTimestamp = ois.readLong(); - - final boolean hasEvent = ois.readBoolean(); - final T event; - - if (hasEvent) { - DataInputViewStreamWrapper input = new DataInputViewStreamWrapper(ois); - event = nonDuplicatingTypeSerializer.deserialize(input); - } else { - event = null; - } - - return ComputationState.createState(this, state, previousState, event, 0, timestamp, version, startTimestamp); - } - ////////////////////// New Serialization ////////////////////// /** @@ -893,8 +842,8 @@ public class NFA<T> implements Serializable { } @Override - public TypeSerializer<NFA<T>> duplicate() { - return this; + public NFASerializer<T> duplicate() { + return new NFASerializer<>(eventSerializer.duplicate()); } @Override @@ -906,21 +855,13 @@ public class NFA<T> implements Serializable { public NFA<T> copy(NFA<T> from) { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - - serialize(from, new DataOutputViewStreamWrapper(oos)); - - oos.close(); + serialize(from, new DataOutputViewStreamWrapper(baos)); baos.close(); byte[] data = baos.toByteArray(); ByteArrayInputStream bais = new ByteArrayInputStream(data); - ObjectInputStream ois = new ObjectInputStream(bais); - - @SuppressWarnings("unchecked") - NFA<T> copy = deserialize(new DataInputViewStreamWrapper(ois)); - ois.close(); + NFA<T> copy = deserialize(new DataInputViewStreamWrapper(bais)); bais.close(); return copy; } catch (IOException e) { @@ -1236,91 +1177,4 @@ public class NFA<T> implements Serializable { return null; } } - - ////////////////// Old Serialization ////////////////////// - - /** - * A {@link TypeSerializer} for {@link NFA} that uses Java Serialization. - */ - public static class Serializer<T> extends TypeSerializerSingleton<NFA<T>> { - - private static final long serialVersionUID = 1L; - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public NFA<T> createInstance() { - return null; - } - - @Override - public NFA<T> copy(NFA<T> from) { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - - oos.writeObject(from); - - oos.close(); - baos.close(); - - byte[] data = baos.toByteArray(); - - ByteArrayInputStream bais = new ByteArrayInputStream(data); - ObjectInputStream ois = new ObjectInputStream(bais); - - @SuppressWarnings("unchecked") - NFA<T> copy = (NFA<T>) ois.readObject(); - ois.close(); - bais.close(); - return copy; - } catch (IOException | ClassNotFoundException e) { - throw new RuntimeException("Could not copy NFA.", e); - } - } - - @Override - public NFA<T> copy(NFA<T> from, NFA<T> reuse) { - return copy(from); - } - - @Override - public int getLength() { - return 0; - } - - @Override - public void serialize(NFA<T> record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException("This is the deprecated serialization strategy."); - } - - @Override - public NFA<T> deserialize(DataInputView source) throws IOException { - try (ObjectInputStream ois = new ObjectInputStream(new DataInputViewStream(source))) { - return (NFA<T>) ois.readObject(); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Could not deserialize NFA.", e); - } - } - - @Override - public NFA<T> deserialize(NFA<T> reuse, DataInputView source) throws IOException { - return deserialize(source); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - int size = source.readInt(); - target.writeInt(size); - target.write(source, size); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof Serializer; - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index 6bc5091..0cf47ca 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -38,8 +38,6 @@ import org.apache.commons.lang3.StringUtils; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -829,40 +827,44 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { this.versionSerializer = versionSerializer; } + public TypeSerializer<DeweyNumber> getVersionSerializer() { + return versionSerializer; + } + + public TypeSerializer<K> getKeySerializer() { + return keySerializer; + } + + public TypeSerializer<V> getValueSerializer() { + return valueSerializer; + } + @Override public boolean isImmutableType() { return false; } @Override - public TypeSerializer<SharedBuffer<K, V>> duplicate() { - return new SharedBufferSerializer<>(keySerializer, valueSerializer); + public SharedBufferSerializer<K, V> duplicate() { + return new SharedBufferSerializer<>(keySerializer.duplicate(), valueSerializer.duplicate()); } @Override public SharedBuffer<K, V> createInstance() { - return new SharedBuffer<>(new NonDuplicatingTypeSerializer<V>(valueSerializer)); + return new SharedBuffer<>(new NonDuplicatingTypeSerializer<>(valueSerializer.duplicate())); } @Override - public SharedBuffer<K, V> copy(SharedBuffer from) { + public SharedBuffer<K, V> copy(SharedBuffer<K, V> from) { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - - serialize(from, new DataOutputViewStreamWrapper(oos)); - - oos.close(); + serialize(from, new DataOutputViewStreamWrapper(baos)); baos.close(); byte[] data = baos.toByteArray(); ByteArrayInputStream bais = new ByteArrayInputStream(data); - ObjectInputStream ois = new ObjectInputStream(bais); - - @SuppressWarnings("unchecked") - SharedBuffer<K, V> copy = deserialize(new DataInputViewStreamWrapper(ois)); - ois.close(); + SharedBuffer<K, V> copy = deserialize(new DataInputViewStreamWrapper(bais)); bais.close(); return copy; @@ -872,7 +874,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { } @Override - public SharedBuffer<K, V> copy(SharedBuffer from, SharedBuffer reuse) { + public SharedBuffer<K, V> copy(SharedBuffer<K, V> from, SharedBuffer<K, V> reuse) { return copy(from); } @@ -882,7 +884,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { } @Override - public void serialize(SharedBuffer record, DataOutputView target) throws IOException { + public void serialize(SharedBuffer<K, V> record, DataOutputView target) throws IOException { Map<K, SharedBufferPage<K, V>> pages = record.pages; Map<SharedBufferEntry<K, V>, Integer> entryIDs = new HashMap<>(); @@ -955,7 +957,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { } @Override - public SharedBuffer deserialize(DataInputView source) throws IOException { + public SharedBuffer<K, V> deserialize(DataInputView source) throws IOException { List<SharedBufferEntry<K, V>> entryList = new ArrayList<>(); Map<K, SharedBufferPage<K, V>> pages = new HashMap<>(); @@ -1013,11 +1015,11 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { // here we put the old NonDuplicating serializer because this needs to create a copy // of the buffer, as created by the NFA. There, for compatibility reasons, we have left // the old serializer. - return new SharedBuffer(new NonDuplicatingTypeSerializer(valueSerializer), pages); + return new SharedBuffer<>(new NonDuplicatingTypeSerializer<>(valueSerializer), pages); } @Override - public SharedBuffer deserialize(SharedBuffer reuse, DataInputView source) throws IOException { + public SharedBuffer<K, V> deserialize(SharedBuffer<K, V> reuse, DataInputView source) throws IOException { return deserialize(source); } @@ -1068,11 +1070,19 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { @Override public boolean equals(Object obj) { - return obj == this || - (obj != null && obj.getClass().equals(getClass()) && - keySerializer.equals(((SharedBufferSerializer<?, ?>) obj).keySerializer) && - valueSerializer.equals(((SharedBufferSerializer<?, ?>) obj).valueSerializer) && - versionSerializer.equals(((SharedBufferSerializer<?, ?>) obj).versionSerializer)); + if (obj == this) { + return true; + } + + if (obj == null || !Objects.equals(obj.getClass(), getClass())) { + return false; + } + + SharedBufferSerializer other = (SharedBufferSerializer) obj; + return + Objects.equals(keySerializer, other.getKeySerializer()) && + Objects.equals(valueSerializer, other.getValueSerializer()) && + Objects.equals(versionSerializer, other.getVersionSerializer()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java index 8e739c3..0f4066f 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java @@ -310,14 +310,13 @@ public class NFATest extends TestLogger { NFA.NFASerializer<Event> copySerializer = new NFA.NFASerializer<>(Event.createTypeSerializer()); ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray()); ByteArrayOutputStream out = new ByteArrayOutputStream(); - copySerializer.copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out)); + copySerializer.duplicate().copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out)); in.close(); out.close(); // deserialize ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray()); - NFA.NFASerializer<Event> deserializer = new NFA.NFASerializer<>(Event.createTypeSerializer()); - NFA<Event> copy = deserializer.deserialize(new DataInputViewStreamWrapper(bais)); + NFA<Event> copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais)); bais.close(); assertEquals(nfa, copy); http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java index dfbfa5f..51d27e0 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java @@ -160,7 +160,7 @@ public class SharedBufferTest extends TestLogger { serializer.serialize(sharedBuffer, new DataOutputViewStreamWrapper(baos)); ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - SharedBuffer<String, Event> copy = serializer.deserialize(new DataInputViewStreamWrapper(bais)); + SharedBuffer<String, Event> copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais)); assertEquals(sharedBuffer, copy); } http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index ed8b923..4fa6d09 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -259,12 +259,16 @@ public class CEPOperatorTest extends TestLogger { null, null, new PatternSelectFunction<Event, Map<String, List<Event>>>() { + private static final long serialVersionUID = -5768297287711394420L; + @Override public Map<String, List<Event>> select(Map<String, List<Event>> pattern) throws Exception { return pattern; } }, new PatternTimeoutFunction<Event, Tuple2<Map<String, List<Event>>, Long>>() { + private static final long serialVersionUID = 2843329425823093249L; + @Override public Tuple2<Map<String, List<Event>>, Long> timeout( Map<String, List<Event>> pattern, @@ -274,6 +278,8 @@ public class CEPOperatorTest extends TestLogger { }, timedOut ), new KeySelector<Event, Integer>() { + private static final long serialVersionUID = 7219185117566268366L; + @Override public Integer getKey(Event value) throws Exception { return value.getId(); @@ -281,6 +287,11 @@ public class CEPOperatorTest extends TestLogger { }, BasicTypeInfo.INT_TYPE_INFO); try { + String rocksDbPath = tempFolder.newFolder().getAbsolutePath(); + RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); + rocksDBStateBackend.setDbStoragePath(rocksDbPath); + + harness.setStateBackend(rocksDBStateBackend); harness.setup( new KryoSerializer<>( (Class<Map<String, List<Event>>>) (Object) Map.class,