Repository: flink
Updated Branches:
  refs/heads/master 57333c622 -> ff9cefb36


[FLINK-7835][cep] Fix duplicate() in NFASerializer.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff9cefb3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff9cefb3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff9cefb3

Branch: refs/heads/master
Commit: ff9cefb36c70a9b6c55f607fc2b56644c57f7057
Parents: 57333c6
Author: kkloudas <kklou...@gmail.com>
Authored: Thu Oct 12 15:20:32 2017 +0200
Committer: kkloudas <kklou...@gmail.com>
Committed: Fri Oct 13 13:49:58 2017 +0200

----------------------------------------------------------------------
 .../flink/cep/NonDuplicatingTypeSerializer.java |   5 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 154 +------------------
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |  62 ++++----
 .../java/org/apache/flink/cep/nfa/NFATest.java  |   5 +-
 .../apache/flink/cep/nfa/SharedBufferTest.java  |   2 +-
 .../flink/cep/operator/CEPOperatorTest.java     |  11 ++
 6 files changed, 57 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
index f9e13fe..0978aa1 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
 import java.util.IdentityHashMap;
+import java.util.List;
 
 /**
  * Type serializer which keeps track of the serialized objects so that each 
object is only
@@ -53,7 +54,7 @@ public final class NonDuplicatingTypeSerializer<T> extends 
TypeSerializer<T> {
        private transient IdentityHashMap<T, Integer> identityMap;
 
        // here we store the already deserialized objects
-       private transient ArrayList<T> elementList;
+       private transient List<T> elementList;
 
        public NonDuplicatingTypeSerializer(final TypeSerializer<T> 
typeSerializer) {
                this.typeSerializer = typeSerializer;
@@ -82,7 +83,7 @@ public final class NonDuplicatingTypeSerializer<T> extends 
TypeSerializer<T> {
 
        @Override
        public TypeSerializer<T> duplicate() {
-               return new NonDuplicatingTypeSerializer<>(typeSerializer);
+               return new 
NonDuplicatingTypeSerializer<>(typeSerializer.duplicate());
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index ff4967f..7092d73 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -28,9 +28,7 @@ import 
org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.EnumSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler;
@@ -48,7 +46,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.io.OptionalDataException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -489,7 +486,6 @@ public class NFA<T> implements Serializable {
                }
        }
 
-
        /**
         * Computes the next computation states based on the given computation 
state, the current event,
         * its timestamp and the internal state machine. The algorithm is:
@@ -793,53 +789,6 @@ public class NFA<T> implements Serializable {
                return result;
        }
 
-       //////////////////////                  Fault-Tolerance                 
//////////////////////
-
-       private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
-               ois.defaultReadObject();
-
-               int numberComputationStates = ois.readInt();
-
-               computationStates = new LinkedList<>();
-
-               final List<ComputationState<T>> readComputationStates = new 
ArrayList<>(numberComputationStates);
-
-               for (int i = 0; i < numberComputationStates; i++) {
-                       ComputationState<T> computationState = 
readComputationState(ois);
-                       readComputationStates.add(computationState);
-               }
-
-               this.computationStates.addAll(readComputationStates);
-               nonDuplicatingTypeSerializer.clearReferences();
-       }
-
-       @SuppressWarnings("unchecked")
-       private ComputationState<T> readComputationState(ObjectInputStream ois) 
throws IOException, ClassNotFoundException {
-               final State<T> state = (State<T>) ois.readObject();
-               State<T> previousState;
-               try {
-                       previousState = (State<T>) ois.readObject();
-               } catch (OptionalDataException e) {
-                       previousState = null;
-               }
-
-               final long timestamp = ois.readLong();
-               final DeweyNumber version = (DeweyNumber) ois.readObject();
-               final long startTimestamp = ois.readLong();
-
-               final boolean hasEvent = ois.readBoolean();
-               final T event;
-
-               if (hasEvent) {
-                       DataInputViewStreamWrapper input = new 
DataInputViewStreamWrapper(ois);
-                       event = nonDuplicatingTypeSerializer.deserialize(input);
-               } else {
-                       event = null;
-               }
-
-               return ComputationState.createState(this, state, previousState, 
event, 0, timestamp, version, startTimestamp);
-       }
-
        //////////////////////                  New Serialization               
        //////////////////////
 
        /**
@@ -893,8 +842,8 @@ public class NFA<T> implements Serializable {
                }
 
                @Override
-               public TypeSerializer<NFA<T>> duplicate() {
-                       return this;
+               public NFASerializer<T> duplicate() {
+                       return new NFASerializer<>(eventSerializer.duplicate());
                }
 
                @Override
@@ -906,21 +855,13 @@ public class NFA<T> implements Serializable {
                public NFA<T> copy(NFA<T> from) {
                        try {
                                ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
-                               ObjectOutputStream oos = new 
ObjectOutputStream(baos);
-
-                               serialize(from, new 
DataOutputViewStreamWrapper(oos));
-
-                               oos.close();
+                               serialize(from, new 
DataOutputViewStreamWrapper(baos));
                                baos.close();
 
                                byte[] data = baos.toByteArray();
 
                                ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-                               ObjectInputStream ois = new 
ObjectInputStream(bais);
-
-                               @SuppressWarnings("unchecked")
-                               NFA<T> copy = deserialize(new 
DataInputViewStreamWrapper(ois));
-                               ois.close();
+                               NFA<T> copy = deserialize(new 
DataInputViewStreamWrapper(bais));
                                bais.close();
                                return copy;
                        } catch (IOException e) {
@@ -1236,91 +1177,4 @@ public class NFA<T> implements Serializable {
                        return null;
                }
        }
-
-       //////////////////                      Old Serialization               
        //////////////////////
-
-       /**
-        * A {@link TypeSerializer} for {@link NFA} that uses Java 
Serialization.
-        */
-       public static class Serializer<T> extends 
TypeSerializerSingleton<NFA<T>> {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public boolean isImmutableType() {
-                       return false;
-               }
-
-               @Override
-               public NFA<T> createInstance() {
-                       return null;
-               }
-
-               @Override
-               public NFA<T> copy(NFA<T> from) {
-                       try {
-                               ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
-                               ObjectOutputStream oos = new 
ObjectOutputStream(baos);
-
-                               oos.writeObject(from);
-
-                               oos.close();
-                               baos.close();
-
-                               byte[] data = baos.toByteArray();
-
-                               ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-                               ObjectInputStream ois = new 
ObjectInputStream(bais);
-
-                               @SuppressWarnings("unchecked")
-                               NFA<T> copy = (NFA<T>) ois.readObject();
-                               ois.close();
-                               bais.close();
-                               return copy;
-                       } catch (IOException | ClassNotFoundException e) {
-                               throw new RuntimeException("Could not copy 
NFA.", e);
-                       }
-               }
-
-               @Override
-               public NFA<T> copy(NFA<T> from, NFA<T> reuse) {
-                       return copy(from);
-               }
-
-               @Override
-               public int getLength() {
-                       return 0;
-               }
-
-               @Override
-               public void serialize(NFA<T> record, DataOutputView target) 
throws IOException {
-                       throw new UnsupportedOperationException("This is the 
deprecated serialization strategy.");
-               }
-
-               @Override
-               public NFA<T> deserialize(DataInputView source) throws 
IOException {
-                       try (ObjectInputStream ois = new ObjectInputStream(new 
DataInputViewStream(source))) {
-                               return (NFA<T>) ois.readObject();
-                       } catch (ClassNotFoundException e) {
-                               throw new RuntimeException("Could not 
deserialize NFA.", e);
-                       }
-               }
-
-               @Override
-               public NFA<T> deserialize(NFA<T> reuse, DataInputView source) 
throws IOException {
-                       return deserialize(source);
-               }
-
-               @Override
-               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
-                       int size = source.readInt();
-                       target.writeInt(size);
-                       target.write(source, size);
-               }
-
-               @Override
-               public boolean canEqual(Object obj) {
-                       return obj instanceof Serializer;
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 6bc5091..0cf47ca 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -38,8 +38,6 @@ import org.apache.commons.lang3.StringUtils;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -829,40 +827,44 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
                        this.versionSerializer = versionSerializer;
                }
 
+               public TypeSerializer<DeweyNumber> getVersionSerializer() {
+                       return versionSerializer;
+               }
+
+               public TypeSerializer<K> getKeySerializer() {
+                       return keySerializer;
+               }
+
+               public TypeSerializer<V> getValueSerializer() {
+                       return valueSerializer;
+               }
+
                @Override
                public boolean isImmutableType() {
                        return false;
                }
 
                @Override
-               public TypeSerializer<SharedBuffer<K, V>> duplicate() {
-                       return new SharedBufferSerializer<>(keySerializer, 
valueSerializer);
+               public SharedBufferSerializer<K, V> duplicate() {
+                       return new 
SharedBufferSerializer<>(keySerializer.duplicate(), 
valueSerializer.duplicate());
                }
 
                @Override
                public SharedBuffer<K, V> createInstance() {
-                       return new SharedBuffer<>(new 
NonDuplicatingTypeSerializer<V>(valueSerializer));
+                       return new SharedBuffer<>(new 
NonDuplicatingTypeSerializer<>(valueSerializer.duplicate()));
                }
 
                @Override
-               public SharedBuffer<K, V> copy(SharedBuffer from) {
+               public SharedBuffer<K, V> copy(SharedBuffer<K, V> from) {
                        try {
                                ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
-                               ObjectOutputStream oos = new 
ObjectOutputStream(baos);
-
-                               serialize(from, new 
DataOutputViewStreamWrapper(oos));
-
-                               oos.close();
+                               serialize(from, new 
DataOutputViewStreamWrapper(baos));
                                baos.close();
 
                                byte[] data = baos.toByteArray();
 
                                ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-                               ObjectInputStream ois = new 
ObjectInputStream(bais);
-
-                               @SuppressWarnings("unchecked")
-                               SharedBuffer<K, V> copy = deserialize(new 
DataInputViewStreamWrapper(ois));
-                               ois.close();
+                               SharedBuffer<K, V> copy = deserialize(new 
DataInputViewStreamWrapper(bais));
                                bais.close();
 
                                return copy;
@@ -872,7 +874,7 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
                }
 
                @Override
-               public SharedBuffer<K, V> copy(SharedBuffer from, SharedBuffer 
reuse) {
+               public SharedBuffer<K, V> copy(SharedBuffer<K, V> from, 
SharedBuffer<K, V> reuse) {
                        return copy(from);
                }
 
@@ -882,7 +884,7 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
                }
 
                @Override
-               public void serialize(SharedBuffer record, DataOutputView 
target) throws IOException {
+               public void serialize(SharedBuffer<K, V> record, DataOutputView 
target) throws IOException {
                        Map<K, SharedBufferPage<K, V>> pages = record.pages;
                        Map<SharedBufferEntry<K, V>, Integer> entryIDs = new 
HashMap<>();
 
@@ -955,7 +957,7 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
                }
 
                @Override
-               public SharedBuffer deserialize(DataInputView source) throws 
IOException {
+               public SharedBuffer<K, V> deserialize(DataInputView source) 
throws IOException {
                        List<SharedBufferEntry<K, V>> entryList = new 
ArrayList<>();
                        Map<K, SharedBufferPage<K, V>> pages = new HashMap<>();
 
@@ -1013,11 +1015,11 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
                        // here we put the old NonDuplicating serializer 
because this needs to create a copy
                        // of the buffer, as created by the NFA. There, for 
compatibility reasons, we have left
                        // the old serializer.
-                       return new SharedBuffer(new 
NonDuplicatingTypeSerializer(valueSerializer), pages);
+                       return new SharedBuffer<>(new 
NonDuplicatingTypeSerializer<>(valueSerializer), pages);
                }
 
                @Override
-               public SharedBuffer deserialize(SharedBuffer reuse, 
DataInputView source) throws IOException {
+               public SharedBuffer<K, V> deserialize(SharedBuffer<K, V> reuse, 
DataInputView source) throws IOException {
                        return deserialize(source);
                }
 
@@ -1068,11 +1070,19 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
 
                @Override
                public boolean equals(Object obj) {
-                       return obj == this ||
-                                       (obj != null && 
obj.getClass().equals(getClass()) &&
-                                                       
keySerializer.equals(((SharedBufferSerializer<?, ?>) obj).keySerializer) &&
-                                                       
valueSerializer.equals(((SharedBufferSerializer<?, ?>) obj).valueSerializer) &&
-                                                       
versionSerializer.equals(((SharedBufferSerializer<?, ?>) 
obj).versionSerializer));
+                       if (obj == this) {
+                               return true;
+                       }
+
+                       if (obj == null || !Objects.equals(obj.getClass(), 
getClass())) {
+                               return false;
+                       }
+
+                       SharedBufferSerializer other = (SharedBufferSerializer) 
obj;
+                       return
+                                       Objects.equals(keySerializer, 
other.getKeySerializer()) &&
+                                       Objects.equals(valueSerializer, 
other.getValueSerializer()) &&
+                                       Objects.equals(versionSerializer, 
other.getVersionSerializer());
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 8e739c3..0f4066f 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -310,14 +310,13 @@ public class NFATest extends TestLogger {
                        NFA.NFASerializer<Event> copySerializer = new 
NFA.NFASerializer<>(Event.createTypeSerializer());
                        ByteArrayInputStream in = new 
ByteArrayInputStream(baos.toByteArray());
                        ByteArrayOutputStream out = new ByteArrayOutputStream();
-                       copySerializer.copy(new DataInputViewStreamWrapper(in), 
new DataOutputViewStreamWrapper(out));
+                       copySerializer.duplicate().copy(new 
DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
                        in.close();
                        out.close();
 
                        // deserialize
                        ByteArrayInputStream bais = new 
ByteArrayInputStream(out.toByteArray());
-                       NFA.NFASerializer<Event> deserializer = new 
NFA.NFASerializer<>(Event.createTypeSerializer());
-                       NFA<Event> copy = deserializer.deserialize(new 
DataInputViewStreamWrapper(bais));
+                       NFA<Event> copy = 
serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
                        bais.close();
 
                        assertEquals(nfa, copy);

http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index dfbfa5f..51d27e0 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -160,7 +160,7 @@ public class SharedBufferTest extends TestLogger {
                serializer.serialize(sharedBuffer, new 
DataOutputViewStreamWrapper(baos));
 
                ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
-               SharedBuffer<String, Event> copy = serializer.deserialize(new 
DataInputViewStreamWrapper(bais));
+               SharedBuffer<String, Event> copy = 
serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
 
                assertEquals(sharedBuffer, copy);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index ed8b923..4fa6d09 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -259,12 +259,16 @@ public class CEPOperatorTest extends TestLogger {
                                        null,
                                        null,
                                        new PatternSelectFunction<Event, 
Map<String, List<Event>>>() {
+                                               private static final long 
serialVersionUID = -5768297287711394420L;
+
                                                @Override
                                                public Map<String, List<Event>> 
select(Map<String, List<Event>> pattern) throws Exception {
                                                        return pattern;
                                                }
                                        },
                                        new PatternTimeoutFunction<Event, 
Tuple2<Map<String, List<Event>>, Long>>() {
+                                               private static final long 
serialVersionUID = 2843329425823093249L;
+
                                                @Override
                                                public Tuple2<Map<String, 
List<Event>>, Long> timeout(
                                                        Map<String, 
List<Event>> pattern,
@@ -274,6 +278,8 @@ public class CEPOperatorTest extends TestLogger {
                                        },
                                        timedOut
                                ), new KeySelector<Event, Integer>() {
+                               private static final long serialVersionUID = 
7219185117566268366L;
+
                                @Override
                                public Integer getKey(Event value) throws 
Exception {
                                        return value.getId();
@@ -281,6 +287,11 @@ public class CEPOperatorTest extends TestLogger {
                        }, BasicTypeInfo.INT_TYPE_INFO);
 
                try {
+                       String rocksDbPath = 
tempFolder.newFolder().getAbsolutePath();
+                       RocksDBStateBackend rocksDBStateBackend = new 
RocksDBStateBackend(new MemoryStateBackend());
+                       rocksDBStateBackend.setDbStoragePath(rocksDbPath);
+
+                       harness.setStateBackend(rocksDBStateBackend);
                        harness.setup(
                                new KryoSerializer<>(
                                        (Class<Map<String, List<Event>>>) 
(Object) Map.class,

Reply via email to