This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit ff00541ec2dc04db1129a3db7dacbbf307622cc1 Author: Igal Shilman <[email protected]> AuthorDate: Wed Feb 27 21:13:52 2019 +0100 [FLINK-11773] [core] Use LinkedOptionalMapSerializer in Kryo-/PojoSerializerSnapshotData --- .../runtime/PojoSerializerSnapshotData.java | 65 +------ .../runtime/kryo/KryoSerializerSnapshotData.java | 208 ++++++++++----------- 2 files changed, 107 insertions(+), 166 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotData.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotData.java index 8999d77..c0f1100 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotData.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotData.java @@ -28,7 +28,6 @@ import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.LinkedOptionalMap; import org.apache.flink.util.function.BiConsumerWithException; import org.apache.flink.util.function.BiFunctionWithException; -import org.apache.flink.util.function.FunctionWithException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +39,8 @@ import java.util.LinkedHashMap; import java.util.Map; import static org.apache.flink.util.LinkedOptionalMap.optionalMapOf; +import static org.apache.flink.util.LinkedOptionalMapSerializer.readOptionalMap; +import static org.apache.flink.util.LinkedOptionalMapSerializer.writeOptionalMap; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -220,67 +221,11 @@ final class PojoSerializerSnapshotData<T> { return String.format("missing-field-at-%d", fieldIndex); } - private static <K, V> void writeOptionalMap( - DataOutputView out, - LinkedOptionalMap<K, V> map, - BiConsumerWithException<DataOutputView, K, IOException> keyWriter, - BiConsumerWithException<DataOutputView, V, IOException> valueWriter) throws IOException { - - out.writeInt(map.size()); - map.forEach(((keyName, key, value) -> { - out.writeUTF(keyName); - - if (key == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - keyWriter.accept(out, key); - } - - if (value == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - valueWriter.accept(out, value); - } - })); - } - - private static <K, V> LinkedOptionalMap<K, V> readOptionalMap( - DataInputView in, - BiFunctionWithException<DataInputView, String, K, IOException> keyReader, - FunctionWithException<DataInputView, V, IOException> valueReader) throws IOException { - - long mapSize = in.readInt(); - - LinkedOptionalMap<K, V> map = new LinkedOptionalMap<>(); - for (int i = 0; i < mapSize; i++) { - String keyName = in.readUTF(); - - final K key; - if (in.readBoolean()) { - key = keyReader.apply(in, keyName); - } else { - key = null; - } - - final V value; - if (in.readBoolean()) { - value = valueReader.apply(in); - } else { - value = null; - } - - map.put(keyName, key, value); - } - return map; - } - private enum NoOpWriter implements BiConsumerWithException<DataOutputView, Object, IOException> { INSTANCE; @Override - public void accept(DataOutputView dataOutputView, Object o) throws IOException {} + public void accept(DataOutputView dataOutputView, Object o) {} @SuppressWarnings("unchecked") static <K> BiConsumerWithException<DataOutputView, K, IOException> noopWriter() { @@ -300,8 +245,8 @@ final class PojoSerializerSnapshotData<T> { }; } - private static FunctionWithException<DataInputView, TypeSerializerSnapshot<?>, IOException> snapshotReader(ClassLoader cl) { - return input -> { + private static BiFunctionWithException<DataInputView, String, TypeSerializerSnapshot<?>, IOException> snapshotReader(ClassLoader cl) { + return (input, unused) -> { try { return TypeSerializerSnapshot.readVersionedSnapshot(input, cl); } diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java index f04c8e8..16da1f9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java @@ -26,18 +26,21 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.LinkedOptionalMap; +import org.apache.flink.util.function.BiFunctionWithException; import com.esotericsoftware.kryo.Serializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataOutput; import java.io.IOException; import java.io.InvalidClassException; import java.util.LinkedHashMap; -import java.util.Map.Entry; import java.util.function.Function; import static org.apache.flink.util.LinkedOptionalMap.optionalMapOf; +import static org.apache.flink.util.LinkedOptionalMapSerializer.readOptionalMap; +import static org.apache.flink.util.LinkedOptionalMapSerializer.writeOptionalMap; import static org.apache.flink.util.Preconditions.checkNotNull; final class KryoSerializerSnapshotData<T> { @@ -135,28 +138,26 @@ final class KryoSerializerSnapshotData<T> { DataOutputView out, LinkedOptionalMap<String, KryoRegistration> kryoRegistrations) throws IOException { - out.writeInt(kryoRegistrations.size()); - for (Entry<String, KryoRegistration> entry : kryoRegistrations.unwrapOptionals().entrySet()) { - out.writeUTF(entry.getKey()); - KryoRegistrationUtil.writeKryoRegistration(entry.getValue(), out); - } + writeOptionalMap( + out, + kryoRegistrations, + DataOutput::writeUTF, + KryoRegistrationUtil::writeKryoRegistration); } private void writeDefaultKryoSerializers( DataOutputView out, - LinkedOptionalMap<Class<?>, - SerializableSerializer<?>> defaultKryoSerializers) throws IOException { - - out.writeInt(defaultKryoSerializers.size()); - for (Entry<Class<?>, SerializableSerializer<?>> entry : defaultKryoSerializers.unwrapOptionals().entrySet()) { - Class<?> javaClass = entry.getKey(); - SerializableSerializer<?> serializerInstance = entry.getValue(); - - out.writeUTF(javaClass.getName()); - try (final DataOutputViewStream outViewWrapper = new DataOutputViewStream(out)) { - InstantiationUtil.serializeObject(outViewWrapper, serializerInstance); - } - } + LinkedOptionalMap<Class<?>, SerializableSerializer<?>> defaultKryoSerializers) throws IOException { + + writeOptionalMap( + out, + defaultKryoSerializers, + (stream, klass) -> stream.writeUTF(klass.getName()), + (stream, instance) -> { + try (final DataOutputViewStream outViewWrapper = new DataOutputViewStream(stream)) { + InstantiationUtil.serializeObject(outViewWrapper, instance); + } + }); } private static void writeDefaultKryoSerializerClasses( @@ -164,14 +165,12 @@ final class KryoSerializerSnapshotData<T> { LinkedOptionalMap<Class<?>, Class<? extends Serializer<?>>> defaultKryoSerializerClasses) throws IOException { - out.writeInt(defaultKryoSerializerClasses.size()); - - for (Entry<Class<?>, Class<? extends Serializer<?>>> entry : defaultKryoSerializerClasses.unwrapOptionals().entrySet()) { - Class<?> javaClass = entry.getKey(); - Class<? extends Serializer<?>> serializerClass = entry.getValue(); - out.writeUTF(javaClass.getName()); - out.writeUTF(serializerClass.getName()); - } + writeOptionalMap( + out, + defaultKryoSerializerClasses, + (stream, klass) -> stream.writeUTF(klass.getName()), + (stream, klass) -> stream.writeUTF(klass.getName()) + ); } // -------------------------------------------------------------------------------------------- @@ -186,43 +185,19 @@ final class KryoSerializerSnapshotData<T> { DataInputView in, ClassLoader userCodeClassLoader) throws IOException { - LinkedOptionalMap<String, KryoRegistration> registrations = new LinkedOptionalMap<>(); - final int size = in.readInt(); - for (int i = 0; i < size; i++) { - final String name = in.readUTF(); - KryoRegistration kryoRegistration = KryoRegistrationUtil.tryReadKryoRegistration( - in, - userCodeClassLoader); - registrations.put(name, name, kryoRegistration); - } - - return registrations; + return readOptionalMap( + in, + (stream, unused) -> stream.readUTF(), + (stream, unused) -> KryoRegistrationUtil.tryReadKryoRegistration(stream, userCodeClassLoader) + ); } + @SuppressWarnings("unchecked") private static LinkedOptionalMap<Class<?>, SerializableSerializer<?>> readDefaultKryoSerializers(DataInputView in, ClassLoader cl) throws IOException { - LinkedOptionalMap<Class<?>, SerializableSerializer<?>> kryoSerializers = new LinkedOptionalMap<>(); - final int size = in.readInt(); - for (int i = 0; i < size; i++) { - final String className = in.readUTF(); - Class<?> javaClass = null; - try { - javaClass = Class.forName(className, false, cl); - } - catch (ClassNotFoundException e) { - LOG.warn("Cannot find registered class " + className + " for Kryo serialization in classpath.", e); - } - SerializableSerializer<?> kryoSerializer = null; - try { - try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) { - kryoSerializer = InstantiationUtil.deserializeObject(inViewWrapper, cl); - } - } - catch (Throwable e) { - LOG.warn("Cannot deserialize a previously serialized kryo serializer for the type " + className, e); - } - kryoSerializers.put(className, javaClass, kryoSerializer); - } - return kryoSerializers; + return readOptionalMap( + in, + new ClassResolverByName(cl), + new SerializeableSerializerResolver(cl)); } @SuppressWarnings("unchecked") @@ -230,28 +205,7 @@ final class KryoSerializerSnapshotData<T> { DataInputView in, ClassLoader cl) throws IOException { - LinkedOptionalMap<Class<?>, Class<? extends Serializer<?>>> kryoSerializerClasses = new LinkedOptionalMap<>(); - final int size = in.readInt(); - for (int i = 0; i < size; i++) { - final String className = in.readUTF(); - Class<?> typeClass = null; - try { - typeClass = Class.forName(className, false, cl); - } - catch (ClassNotFoundException e) { - LOG.warn("Cannot find registered class " + className + " for Kryo serialization in classpath.", e); - } - final String kryoSerializerClassName = in.readUTF(); - Class<? extends Serializer<?>> kryoSerializerClass = null; - try { - kryoSerializerClass = (Class<? extends Serializer<?>>) Class.forName(kryoSerializerClassName, false, cl); - } - catch (Throwable e) { - LOG.warn("Cannot find registered class " + className + " for Kryo serialization in classpath.", e); - } - kryoSerializerClasses.put(className, typeClass, kryoSerializerClass); - } - return kryoSerializerClasses; + return readOptionalMap(in, new ClassResolverByName(cl), new ClassResolverByName<Serializer<?>>(cl)); } // -------------------------------------------------------------------------------------------- @@ -261,14 +215,13 @@ final class KryoSerializerSnapshotData<T> { private static final class KryoRegistrationUtil { static void writeKryoRegistration( - KryoRegistration kryoRegistration, - DataOutputView out) throws IOException { + DataOutputView out, KryoRegistration kryoRegistration) throws IOException { checkNotNull(kryoRegistration); out.writeUTF(kryoRegistration.getRegisteredClass().getName()); final KryoRegistration.SerializerDefinitionType serializerDefinitionType = - kryoRegistration.getSerializerDefinitionType(); + kryoRegistration.getSerializerDefinitionType(); out.writeInt(serializerDefinitionType.ordinal()); switch (serializerDefinitionType) { @@ -290,14 +243,14 @@ final class KryoSerializerSnapshotData<T> { } default: { throw new IllegalStateException( - "Unrecognized Kryo registration serializer definition type: " + serializerDefinitionType); + "Unrecognized Kryo registration serializer definition type: " + serializerDefinitionType); } } } static KryoRegistration tryReadKryoRegistration( - DataInputView in, - ClassLoader userCodeClassLoader) throws IOException { + DataInputView in, + ClassLoader userCodeClassLoader) throws IOException { String registeredClassname = in.readUTF(); Class<?> registeredClass; @@ -306,12 +259,12 @@ final class KryoSerializerSnapshotData<T> { } catch (ClassNotFoundException e) { LOG.warn("Cannot find registered class " + registeredClassname + " for Kryo serialization in classpath;" + - " using a dummy class as a placeholder.", e); + " using a dummy class as a placeholder.", e); return null; } final KryoRegistration.SerializerDefinitionType serializerDefinitionType = - KryoRegistration.SerializerDefinitionType.values()[in.readInt()]; + KryoRegistration.SerializerDefinitionType.values()[in.readInt()]; switch (serializerDefinitionType) { case UNSPECIFIED: { @@ -325,17 +278,17 @@ final class KryoSerializerSnapshotData<T> { } default: { throw new IllegalStateException( - "Unrecognized Kryo registration serializer definition type: " + serializerDefinitionType); + "Unrecognized Kryo registration serializer definition type: " + serializerDefinitionType); } } } @SuppressWarnings("unchecked") private static KryoRegistration tryReadWithSerializerClass( - DataInputView in, - ClassLoader userCodeClassLoader, - String registeredClassname, - Class<?> registeredClass) throws IOException { + DataInputView in, + ClassLoader userCodeClassLoader, + String registeredClassname, + Class<?> registeredClass) throws IOException { String serializerClassname = in.readUTF(); Class serializerClass; try { @@ -344,17 +297,17 @@ final class KryoSerializerSnapshotData<T> { } catch (ClassNotFoundException e) { LOG.warn("Cannot find registered Kryo serializer class for class " + registeredClassname + - " in classpath; using a dummy Kryo serializer that should be replaced as soon as" + - " a new Kryo serializer for the class is present", e); + " in classpath; using a dummy Kryo serializer that should be replaced as soon as" + + " a new Kryo serializer for the class is present", e); } return null; } private static KryoRegistration tryReadWithSerializerInstance( - DataInputView in, - ClassLoader userCodeClassLoader, - String registeredClassname, - Class<?> registeredClass) throws IOException { + DataInputView in, + ClassLoader userCodeClassLoader, + String registeredClassname, + Class<?> registeredClass) throws IOException { SerializableSerializer<? extends Serializer<?>> serializerInstance; try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) { @@ -363,17 +316,60 @@ final class KryoSerializerSnapshotData<T> { } catch (ClassNotFoundException e) { LOG.warn("Cannot find registered Kryo serializer class for class " + registeredClassname + - " in classpath; using a dummy Kryo serializer that should be replaced as soon as" + - " a new Kryo serializer for the class is present", e); + " in classpath; using a dummy Kryo serializer that should be replaced as soon as" + + " a new Kryo serializer for the class is present", e); } catch (InvalidClassException e) { LOG.warn("The registered Kryo serializer class for class " + registeredClassname + - " has changed and is no longer valid; using a dummy Kryo serializer that should be replaced" + - " as soon as a new Kryo serializer for the class is present.", e); + " has changed and is no longer valid; using a dummy Kryo serializer that should be replaced" + + " as soon as a new Kryo serializer for the class is present.", e); } return null; } } + + private static class ClassResolverByName<T> implements BiFunctionWithException<DataInputView, String, Class<T>, IOException> { + private final ClassLoader classLoader; + + private ClassResolverByName(ClassLoader classLoader) { + this.classLoader = classLoader; + } + + @SuppressWarnings("unchecked") + @Override + public Class<T> apply(DataInputView stream, String unused) throws IOException { + String className = stream.readUTF(); + try { + return (Class<T>) Class.forName(className, false, classLoader); + } + catch (ClassNotFoundException e) { + LOG.warn("Cannot find registered class " + className + " for Kryo serialization in classpath.", e); + return null; + } + } + } + + private static final class SerializeableSerializerResolver implements BiFunctionWithException<DataInputView, String, SerializableSerializer<?>, IOException> { + + private final ClassLoader classLoader; + + private SerializeableSerializerResolver(ClassLoader classLoader) { + this.classLoader = classLoader; + } + + @Override + public SerializableSerializer<?> apply(DataInputView stream, String className) { + try { + try (final DataInputViewStream inViewWrapper = new DataInputViewStream(stream)) { + return InstantiationUtil.deserializeObject(inViewWrapper, classLoader); + } + } + catch (Throwable e) { + LOG.warn("Cannot deserialize a previously serialized kryo serializer for the type " + className, e); + return null; + } + } + } }
