This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ff00541ec2dc04db1129a3db7dacbbf307622cc1
Author: Igal Shilman <[email protected]>
AuthorDate: Wed Feb 27 21:13:52 2019 +0100

    [FLINK-11773] [core] Use LinkedOptionalMapSerializer in 
Kryo-/PojoSerializerSnapshotData
---
 .../runtime/PojoSerializerSnapshotData.java        |  65 +------
 .../runtime/kryo/KryoSerializerSnapshotData.java   | 208 ++++++++++-----------
 2 files changed, 107 insertions(+), 166 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotData.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotData.java
index 8999d77..c0f1100 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotData.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotData.java
@@ -28,7 +28,6 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.LinkedOptionalMap;
 import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.BiFunctionWithException;
-import org.apache.flink.util.function.FunctionWithException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +39,8 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.apache.flink.util.LinkedOptionalMap.optionalMapOf;
+import static 
org.apache.flink.util.LinkedOptionalMapSerializer.readOptionalMap;
+import static 
org.apache.flink.util.LinkedOptionalMapSerializer.writeOptionalMap;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -220,67 +221,11 @@ final class PojoSerializerSnapshotData<T> {
                return String.format("missing-field-at-%d", fieldIndex);
        }
 
-       private static <K, V> void writeOptionalMap(
-               DataOutputView out,
-               LinkedOptionalMap<K, V> map,
-               BiConsumerWithException<DataOutputView, K, IOException> 
keyWriter,
-               BiConsumerWithException<DataOutputView, V, IOException> 
valueWriter) throws IOException {
-
-               out.writeInt(map.size());
-               map.forEach(((keyName, key, value) -> {
-                       out.writeUTF(keyName);
-
-                       if (key == null) {
-                               out.writeBoolean(false);
-                       } else {
-                               out.writeBoolean(true);
-                               keyWriter.accept(out, key);
-                       }
-
-                       if (value == null) {
-                               out.writeBoolean(false);
-                       } else {
-                               out.writeBoolean(true);
-                               valueWriter.accept(out, value);
-                       }
-               }));
-       }
-
-       private static <K, V> LinkedOptionalMap<K, V> readOptionalMap(
-               DataInputView in,
-               BiFunctionWithException<DataInputView, String, K, IOException> 
keyReader,
-               FunctionWithException<DataInputView, V, IOException> 
valueReader) throws IOException {
-
-               long mapSize = in.readInt();
-
-               LinkedOptionalMap<K, V> map = new LinkedOptionalMap<>();
-               for (int i = 0; i < mapSize; i++) {
-                       String keyName = in.readUTF();
-
-                       final K key;
-                       if (in.readBoolean()) {
-                               key = keyReader.apply(in, keyName);
-                       } else {
-                               key = null;
-                       }
-
-                       final V value;
-                       if (in.readBoolean()) {
-                               value = valueReader.apply(in);
-                       } else {
-                               value = null;
-                       }
-
-                       map.put(keyName, key, value);
-               }
-               return map;
-       }
-
        private enum NoOpWriter implements 
BiConsumerWithException<DataOutputView, Object, IOException> {
                INSTANCE;
 
                @Override
-               public void accept(DataOutputView dataOutputView, Object o) 
throws IOException {}
+               public void accept(DataOutputView dataOutputView, Object o) {}
 
                @SuppressWarnings("unchecked")
                static <K> BiConsumerWithException<DataOutputView, K, 
IOException> noopWriter() {
@@ -300,8 +245,8 @@ final class PojoSerializerSnapshotData<T> {
                };
        }
 
-       private static FunctionWithException<DataInputView, 
TypeSerializerSnapshot<?>, IOException> snapshotReader(ClassLoader cl) {
-               return input -> {
+       private static BiFunctionWithException<DataInputView, String, 
TypeSerializerSnapshot<?>, IOException> snapshotReader(ClassLoader cl) {
+               return (input, unused) -> {
                        try {
                                return 
TypeSerializerSnapshot.readVersionedSnapshot(input, cl);
                        }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java
index f04c8e8..16da1f9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java
@@ -26,18 +26,21 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.LinkedOptionalMap;
+import org.apache.flink.util.function.BiFunctionWithException;
 
 import com.esotericsoftware.kryo.Serializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InvalidClassException;
 import java.util.LinkedHashMap;
-import java.util.Map.Entry;
 import java.util.function.Function;
 
 import static org.apache.flink.util.LinkedOptionalMap.optionalMapOf;
+import static 
org.apache.flink.util.LinkedOptionalMapSerializer.readOptionalMap;
+import static 
org.apache.flink.util.LinkedOptionalMapSerializer.writeOptionalMap;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 final class KryoSerializerSnapshotData<T> {
@@ -135,28 +138,26 @@ final class KryoSerializerSnapshotData<T> {
                DataOutputView out,
                LinkedOptionalMap<String, KryoRegistration> kryoRegistrations) 
throws IOException {
 
-               out.writeInt(kryoRegistrations.size());
-               for (Entry<String, KryoRegistration> entry : 
kryoRegistrations.unwrapOptionals().entrySet()) {
-                       out.writeUTF(entry.getKey());
-                       
KryoRegistrationUtil.writeKryoRegistration(entry.getValue(), out);
-               }
+               writeOptionalMap(
+                       out,
+                       kryoRegistrations,
+                       DataOutput::writeUTF,
+                       KryoRegistrationUtil::writeKryoRegistration);
        }
 
        private void writeDefaultKryoSerializers(
                DataOutputView out,
-               LinkedOptionalMap<Class<?>,
-                       SerializableSerializer<?>> defaultKryoSerializers) 
throws IOException {
-
-               out.writeInt(defaultKryoSerializers.size());
-               for (Entry<Class<?>, SerializableSerializer<?>> entry : 
defaultKryoSerializers.unwrapOptionals().entrySet()) {
-                       Class<?> javaClass = entry.getKey();
-                       SerializableSerializer<?> serializerInstance = 
entry.getValue();
-
-                       out.writeUTF(javaClass.getName());
-                       try (final DataOutputViewStream outViewWrapper = new 
DataOutputViewStream(out)) {
-                               
InstantiationUtil.serializeObject(outViewWrapper, serializerInstance);
-                       }
-               }
+               LinkedOptionalMap<Class<?>, SerializableSerializer<?>> 
defaultKryoSerializers) throws IOException {
+
+               writeOptionalMap(
+                       out,
+                       defaultKryoSerializers,
+                       (stream, klass) -> stream.writeUTF(klass.getName()),
+                       (stream, instance) -> {
+                               try (final DataOutputViewStream outViewWrapper 
= new DataOutputViewStream(stream)) {
+                                       
InstantiationUtil.serializeObject(outViewWrapper, instance);
+                               }
+                       });
        }
 
        private static void writeDefaultKryoSerializerClasses(
@@ -164,14 +165,12 @@ final class KryoSerializerSnapshotData<T> {
                LinkedOptionalMap<Class<?>, Class<? extends Serializer<?>>> 
defaultKryoSerializerClasses)
                throws IOException {
 
-               out.writeInt(defaultKryoSerializerClasses.size());
-
-               for (Entry<Class<?>, Class<? extends Serializer<?>>> entry : 
defaultKryoSerializerClasses.unwrapOptionals().entrySet()) {
-                       Class<?> javaClass = entry.getKey();
-                       Class<? extends Serializer<?>> serializerClass = 
entry.getValue();
-                       out.writeUTF(javaClass.getName());
-                       out.writeUTF(serializerClass.getName());
-               }
+               writeOptionalMap(
+                       out,
+                       defaultKryoSerializerClasses,
+                       (stream, klass) -> stream.writeUTF(klass.getName()),
+                       (stream, klass) -> stream.writeUTF(klass.getName())
+               );
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -186,43 +185,19 @@ final class KryoSerializerSnapshotData<T> {
                DataInputView in,
                ClassLoader userCodeClassLoader) throws IOException {
 
-               LinkedOptionalMap<String, KryoRegistration> registrations = new 
LinkedOptionalMap<>();
-               final int size = in.readInt();
-               for (int i = 0; i < size; i++) {
-                       final String name = in.readUTF();
-                       KryoRegistration kryoRegistration = 
KryoRegistrationUtil.tryReadKryoRegistration(
-                                       in,
-                                       userCodeClassLoader);
-                       registrations.put(name, name, kryoRegistration);
-               }
-
-               return registrations;
+               return readOptionalMap(
+                       in,
+                       (stream, unused) -> stream.readUTF(),
+                       (stream, unused) -> 
KryoRegistrationUtil.tryReadKryoRegistration(stream, userCodeClassLoader)
+               );
        }
 
+       @SuppressWarnings("unchecked")
        private static LinkedOptionalMap<Class<?>, SerializableSerializer<?>> 
readDefaultKryoSerializers(DataInputView in, ClassLoader cl) throws IOException 
{
-               LinkedOptionalMap<Class<?>, SerializableSerializer<?>> 
kryoSerializers = new LinkedOptionalMap<>();
-               final int size = in.readInt();
-               for (int i = 0; i < size; i++) {
-                       final String className = in.readUTF();
-                       Class<?> javaClass = null;
-                       try {
-                               javaClass = Class.forName(className, false, cl);
-                       }
-                       catch (ClassNotFoundException e) {
-                               LOG.warn("Cannot find registered class " + 
className + " for Kryo serialization in classpath.", e);
-                       }
-                       SerializableSerializer<?> kryoSerializer = null;
-                       try {
-                               try (final DataInputViewStream inViewWrapper = 
new DataInputViewStream(in)) {
-                                       kryoSerializer = 
InstantiationUtil.deserializeObject(inViewWrapper, cl);
-                               }
-                       }
-                       catch (Throwable e) {
-                               LOG.warn("Cannot deserialize a previously 
serialized kryo serializer for the type " + className, e);
-                       }
-                       kryoSerializers.put(className, javaClass, 
kryoSerializer);
-               }
-               return kryoSerializers;
+               return readOptionalMap(
+                       in,
+                       new ClassResolverByName(cl),
+                       new SerializeableSerializerResolver(cl));
        }
 
        @SuppressWarnings("unchecked")
@@ -230,28 +205,7 @@ final class KryoSerializerSnapshotData<T> {
                DataInputView in,
                ClassLoader cl) throws IOException {
 
-               LinkedOptionalMap<Class<?>, Class<? extends Serializer<?>>> 
kryoSerializerClasses = new LinkedOptionalMap<>();
-               final int size = in.readInt();
-               for (int i = 0; i < size; i++) {
-                       final String className = in.readUTF();
-                       Class<?> typeClass = null;
-                       try {
-                               typeClass = Class.forName(className, false, cl);
-                       }
-                       catch (ClassNotFoundException e) {
-                               LOG.warn("Cannot find registered class " + 
className + " for Kryo serialization in classpath.", e);
-                       }
-                       final String kryoSerializerClassName = in.readUTF();
-                       Class<? extends Serializer<?>> kryoSerializerClass = 
null;
-                       try {
-                               kryoSerializerClass = (Class<? extends 
Serializer<?>>) Class.forName(kryoSerializerClassName, false, cl);
-                       }
-                       catch (Throwable e) {
-                               LOG.warn("Cannot find registered class " + 
className + " for Kryo serialization in classpath.", e);
-                       }
-                       kryoSerializerClasses.put(className, typeClass, 
kryoSerializerClass);
-               }
-               return kryoSerializerClasses;
+               return readOptionalMap(in, new ClassResolverByName(cl), new 
ClassResolverByName<Serializer<?>>(cl));
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -261,14 +215,13 @@ final class KryoSerializerSnapshotData<T> {
        private static final class KryoRegistrationUtil {
 
                static void writeKryoRegistration(
-                               KryoRegistration kryoRegistration,
-                               DataOutputView out) throws IOException {
+                       DataOutputView out, KryoRegistration kryoRegistration) 
throws IOException {
 
                        checkNotNull(kryoRegistration);
                        
out.writeUTF(kryoRegistration.getRegisteredClass().getName());
 
                        final KryoRegistration.SerializerDefinitionType 
serializerDefinitionType =
-                                       
kryoRegistration.getSerializerDefinitionType();
+                               kryoRegistration.getSerializerDefinitionType();
 
                        out.writeInt(serializerDefinitionType.ordinal());
                        switch (serializerDefinitionType) {
@@ -290,14 +243,14 @@ final class KryoSerializerSnapshotData<T> {
                                }
                                default: {
                                        throw new IllegalStateException(
-                                                       "Unrecognized Kryo 
registration serializer definition type: " + serializerDefinitionType);
+                                               "Unrecognized Kryo registration 
serializer definition type: " + serializerDefinitionType);
                                }
                        }
                }
 
                static KryoRegistration tryReadKryoRegistration(
-                               DataInputView in,
-                               ClassLoader userCodeClassLoader) throws 
IOException {
+                       DataInputView in,
+                       ClassLoader userCodeClassLoader) throws IOException {
 
                        String registeredClassname = in.readUTF();
                        Class<?> registeredClass;
@@ -306,12 +259,12 @@ final class KryoSerializerSnapshotData<T> {
                        }
                        catch (ClassNotFoundException e) {
                                LOG.warn("Cannot find registered class " + 
registeredClassname + " for Kryo serialization in classpath;" +
-                                               " using a dummy class as a 
placeholder.", e);
+                                       " using a dummy class as a 
placeholder.", e);
                                return null;
                        }
 
                        final KryoRegistration.SerializerDefinitionType 
serializerDefinitionType =
-                                       
KryoRegistration.SerializerDefinitionType.values()[in.readInt()];
+                               
KryoRegistration.SerializerDefinitionType.values()[in.readInt()];
 
                        switch (serializerDefinitionType) {
                                case UNSPECIFIED: {
@@ -325,17 +278,17 @@ final class KryoSerializerSnapshotData<T> {
                                }
                                default: {
                                        throw new IllegalStateException(
-                                                       "Unrecognized Kryo 
registration serializer definition type: " + serializerDefinitionType);
+                                               "Unrecognized Kryo registration 
serializer definition type: " + serializerDefinitionType);
                                }
                        }
                }
 
                @SuppressWarnings("unchecked")
                private static KryoRegistration tryReadWithSerializerClass(
-                               DataInputView in,
-                               ClassLoader userCodeClassLoader,
-                               String registeredClassname,
-                               Class<?> registeredClass) throws IOException {
+                       DataInputView in,
+                       ClassLoader userCodeClassLoader,
+                       String registeredClassname,
+                       Class<?> registeredClass) throws IOException {
                        String serializerClassname = in.readUTF();
                        Class serializerClass;
                        try {
@@ -344,17 +297,17 @@ final class KryoSerializerSnapshotData<T> {
                        }
                        catch (ClassNotFoundException e) {
                                LOG.warn("Cannot find registered Kryo 
serializer class for class " + registeredClassname +
-                                               " in classpath; using a dummy 
Kryo serializer that should be replaced as soon as" +
-                                               " a new Kryo serializer for the 
class is present", e);
+                                       " in classpath; using a dummy Kryo 
serializer that should be replaced as soon as" +
+                                       " a new Kryo serializer for the class 
is present", e);
                        }
                        return null;
                }
 
                private static KryoRegistration tryReadWithSerializerInstance(
-                               DataInputView in,
-                               ClassLoader userCodeClassLoader,
-                               String registeredClassname,
-                               Class<?> registeredClass) throws IOException {
+                       DataInputView in,
+                       ClassLoader userCodeClassLoader,
+                       String registeredClassname,
+                       Class<?> registeredClass) throws IOException {
                        SerializableSerializer<? extends Serializer<?>> 
serializerInstance;
 
                        try (final DataInputViewStream inViewWrapper = new 
DataInputViewStream(in)) {
@@ -363,17 +316,60 @@ final class KryoSerializerSnapshotData<T> {
                        }
                        catch (ClassNotFoundException e) {
                                LOG.warn("Cannot find registered Kryo 
serializer class for class " + registeredClassname +
-                                               " in classpath; using a dummy 
Kryo serializer that should be replaced as soon as" +
-                                               " a new Kryo serializer for the 
class is present", e);
+                                       " in classpath; using a dummy Kryo 
serializer that should be replaced as soon as" +
+                                       " a new Kryo serializer for the class 
is present", e);
                        }
                        catch (InvalidClassException e) {
                                LOG.warn("The registered Kryo serializer class 
for class " + registeredClassname +
-                                               " has changed and is no longer 
valid; using a dummy Kryo serializer that should be replaced" +
-                                               " as soon as a new Kryo 
serializer for the class is present.", e);
+                                       " has changed and is no longer valid; 
using a dummy Kryo serializer that should be replaced" +
+                                       " as soon as a new Kryo serializer for 
the class is present.", e);
 
                        }
                        return null;
                }
 
        }
+
+       private static class ClassResolverByName<T> implements 
BiFunctionWithException<DataInputView, String, Class<T>, IOException> {
+               private final ClassLoader classLoader;
+
+               private ClassResolverByName(ClassLoader classLoader) {
+                       this.classLoader = classLoader;
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public Class<T> apply(DataInputView stream, String unused) 
throws IOException {
+                       String className = stream.readUTF();
+                       try {
+                               return (Class<T>) Class.forName(className, 
false, classLoader);
+                       }
+                       catch (ClassNotFoundException e) {
+                               LOG.warn("Cannot find registered class " + 
className + " for Kryo serialization in classpath.", e);
+                               return null;
+                       }
+               }
+       }
+
+       private static final class SerializeableSerializerResolver implements 
BiFunctionWithException<DataInputView, String, SerializableSerializer<?>, 
IOException> {
+
+               private final ClassLoader classLoader;
+
+               private SerializeableSerializerResolver(ClassLoader 
classLoader) {
+                       this.classLoader = classLoader;
+               }
+
+               @Override
+               public SerializableSerializer<?> apply(DataInputView stream, 
String className) {
+                       try {
+                               try (final DataInputViewStream inViewWrapper = 
new DataInputViewStream(stream)) {
+                                       return 
InstantiationUtil.deserializeObject(inViewWrapper, classLoader);
+                               }
+                       }
+                       catch (Throwable e) {
+                               LOG.warn("Cannot deserialize a previously 
serialized kryo serializer for the type " + className, e);
+                               return null;
+                       }
+               }
+       }
 }

Reply via email to