[FLINK-1399] Fix kryo registration of types and make sure that tags are assigned for registered types.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0b6af20 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0b6af20 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0b6af20 Branch: refs/heads/master Commit: a0b6af20beed6d12d0f33f6f58f323b20cc12961 Parents: 2d4ed15 Author: Stephan Ewen <se...@apache.org> Authored: Sun Jan 18 13:04:07 2015 -0800 Committer: Stephan Ewen <se...@apache.org> Committed: Sun Jan 18 13:59:39 2015 -0800 ---------------------------------------------------------------------- .../flink/api/java/ExecutionEnvironment.java | 50 ++++++- .../java/typeutils/runtime/KryoSerializer.java | 147 ++++++++++++++++--- .../flink/api/scala/ExecutionEnvironment.scala | 19 ++- 3 files changed, 184 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a0b6af20/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 1753af8..77fed97 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.UUID; import com.esotericsoftware.kryo.Serializer; + import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; @@ -201,20 +202,57 @@ public abstract class ExecutionEnvironment { return this.executionId.toString(); } + // -------------------------------------------------------------------------------------------- + // Registry for types and serializers + // -------------------------------------------------------------------------------------------- + /** - * Registers the given Serializer as a default serializer for the given class at the + * Registers the given Serializer as a default serializer for the given type at the * {@link org.apache.flink.api.java.typeutils.runtime.KryoSerializer}. + * + * Note that the serializer instance must be serializable (as defined by java.io.Serializable), + * because it may be distributed to the worker nodes by java serialization. + * + * @param type The class of the types serialized with the given serializer. + * @param serializer The serializer to use. */ - public void addDefaultKryoSerializer(Class<?> clazz, Serializer<?> serializer) { - KryoSerializer.addDefaultSerializer(clazz, serializer); + public void registerKryoSerializer(Class<?> type, Serializer<?> serializer) { + if (type == null || serializer == null) { + throw new NullPointerException("Cannot register null class or serializer."); + } + + KryoSerializer.registerSerializer(type, serializer); } /** - * Registers the given Serializer as a default serializer for the given class at the + * Registers the given Serializer via its class as a serializer for the given type at the * {@link org.apache.flink.api.java.typeutils.runtime.KryoSerializer}. + * + * @param type The class of the types serialized with the given serializer. + * @param serializerClass The class of the serializer to use. */ - public void addDefaultKryoSerializer(Class<?> clazz, Class<? extends Serializer<?>> serializer) { - KryoSerializer.addDefaultSerializer(clazz, serializer); + public void registerKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) { + if (type == null || serializerClass == null) { + throw new NullPointerException("Cannot register null class or serializer."); + } + + KryoSerializer.registerSerializer(type, serializerClass); + } + + /** + * Registers the given type with the serialization stack. If the type is eventually + * serialized as a POJO, then the type is registered with the POJO serializer. If the + * type ends up being serialized with Kryo, then it will be registered at Kryo to make + * sure that only tags are written. + * + * @param type The class of the type to register. + */ + public void registerType(Class<?> type) { + if (type == null) { + throw new NullPointerException("Cannot register null type class."); + } + + KryoSerializer.registerType(type); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a0b6af20/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index 0a5c4d9..bec5d59 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -21,6 +21,7 @@ package org.apache.flink.api.java.typeutils.runtime; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.JavaSerializer; @@ -34,19 +35,40 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; - +import java.util.Set; + +/** + * A type serializer that serializes its type using the Kryo serialization + * framework (https://github.com/EsotericSoftware/kryo). + * + * This serializer is intended as a fallback serializer for the cases that are + * not covered by the basic types, tuples, and POJOs. + * + * @param <T> The type to be serialized. + */ public class KryoSerializer<T> extends TypeSerializer<T> { + private static final long serialVersionUID = 3L; private static Map<Class<?>, Serializer<?>> staticRegisteredSerializers = new HashMap<Class<?>, Serializer<?>>(); private static Map<Class<?>, Class<? extends Serializer<?>>> staticRegisteredSerializersClasses = new HashMap<Class<?>, Class<? extends Serializer<?>>>(); - - private static Map<Class<?>, Serializer<?>> registeredSerializers; - private static Map<Class<?>, Class<? extends Serializer<?>>> registeredSerializersClasses; + + private static Set<Class<?>> staticRegisteredTypes = new HashSet<Class<?>>(); + + // ------------------------------------------------------------------------ + + private final Map<Class<?>, Serializer<?>> registeredSerializers; + private final Map<Class<?>, Class<? extends Serializer<?>>> registeredSerializersClasses; + private final Set<Class<?>> registeredTypes; private final Class<T> type; + + // ------------------------------------------------------------------------ + // The fields below are lazily initialized after de-serialization private transient Kryo kryo; private transient T copyInstance; @@ -56,6 +78,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> { private transient Input input; private transient Output output; + + // ------------------------------------------------------------------------ public KryoSerializer(Class<T> type){ if(type == null){ @@ -63,10 +87,26 @@ public class KryoSerializer<T> extends TypeSerializer<T> { } this.type = type; - this.registeredSerializers = staticRegisteredSerializers; - this.registeredSerializersClasses = staticRegisteredSerializersClasses; + // create copies of the statically registered serializers + // we use static synchronization to safeguard against concurrent use + // of the static collections. + synchronized (KryoSerializer.class) { + this.registeredSerializers = staticRegisteredSerializers.isEmpty() ? + Collections.<Class<?>, Serializer<?>>emptyMap() : + new HashMap<Class<?>, Serializer<?>>(staticRegisteredSerializers); + + this.registeredSerializersClasses = staticRegisteredSerializersClasses.isEmpty() ? + Collections.<Class<?>, Class<? extends Serializer<?>>>emptyMap() : + new HashMap<Class<?>, Class<? extends Serializer<?>>>(staticRegisteredSerializersClasses); + + this.registeredTypes = staticRegisteredTypes.isEmpty() ? + Collections.<Class<?>>emptySet() : + new HashSet<Class<?>>(staticRegisteredTypes); + } + } + // ------------------------------------------------------------------------ @Override public boolean isImmutableType() { @@ -193,41 +233,104 @@ public class KryoSerializer<T> extends TypeSerializer<T> { private void checkKryoInitialized() { if (this.kryo == null) { this.kryo = new ScalaKryoInstantiator().newKryo(); - this.kryo.addDefaultSerializer(Throwable.class, new JavaSerializer()); - for (Map.Entry<Class<?>, Serializer<?>> e: registeredSerializers.entrySet()) { - this.kryo.addDefaultSerializer(e.getKey(), e.getValue()); - } + // Throwable and all subclasses should be serialized via java serialization + kryo.addDefaultSerializer(Throwable.class, new JavaSerializer()); - for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e: registeredSerializersClasses.entrySet()) { - this.kryo.addDefaultSerializer(e.getKey(), e.getValue()); + // register the type of our class + kryo.register(type); + + // register given types. we do this first so that any registration of a + // more specific serializer overrides this + for (Class<?> type : registeredTypes) { + kryo.register(type); + } + + // register given serializer classes + for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredSerializersClasses.entrySet()) { + Class<?> typeClass = e.getKey(); + Class<? extends Serializer<?>> serializerClass = e.getValue(); + + Serializer<?> serializer = + ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass); + kryo.register(typeClass, serializer); } - this.kryo.setRegistrationRequired(false); - this.kryo.register(type); - - this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + // register given serializers + for (Map.Entry<Class<?>, Serializer<?>> e : registeredSerializers.entrySet()) { + kryo.register(e.getKey(), e.getValue()); + } + + kryo.setRegistrationRequired(false); + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); } } + + // -------------------------------------------------------------------------------------------- + // For registering custom serializers and types + // -------------------------------------------------------------------------------------------- /** * Registers the given Serializer as a default serializer for the given class at the Kryo * instance. + * Note that the serializer instance must be serializable (as defined by java.io.Serializable), + * because it may be distributed to the worker nodes by java serialization. + * + * @param clazz The class of the types serialized with the given serializer. + * @param serializer The serializer to use. + * @throws IllegalArgumentException Thrown, if the serializer is not serializable. */ - public static void addDefaultSerializer(Class<?> clazz, Serializer<?> serializer) { - staticRegisteredSerializers.put(clazz, serializer); + public static void registerSerializer(Class<?> clazz, Serializer<?> serializer) { + if (clazz == null || serializer == null) { + throw new NullPointerException("Cannot register null class or serializer."); + } + if (!(serializer instanceof java.io.Serializable)) { + throw new IllegalArgumentException("The serializer instance must be serializable, (for distributing it in the cluster), " + + "as defined by java.io.Serializable. For stateless serializers, you can use the " + + "'registerSerializer(Class, Class)' method to register the serializer via its class."); + } + + synchronized (KryoSerializer.class) { + staticRegisteredSerializers.put(clazz, serializer); + } } /** - * Registers the given Serializer as a default serializer for the given class at the Kryo + * Registers a serializer via its class as a default serializer for the given class at the Kryo * instance. + * + * @param clazz The class of the types serialized with the given serializer. + * @param serializerClass The serializer to use. */ - public static void addDefaultSerializer(Class<?> clazz, Class<? extends Serializer<?>> serializer) { - staticRegisteredSerializersClasses.put(clazz, serializer); + public static void registerSerializer(Class<?> clazz, Class<? extends Serializer<?>> serializerClass) { + if (clazz == null || serializerClass == null) { + throw new NullPointerException("Cannot register null class or serializer."); + } + + synchronized (KryoSerializer.class) { + staticRegisteredSerializersClasses.put(clazz, serializerClass); + } + } + + /** + * Registers the given type with Kryo. Registering the type allows Kryo to write abbreviated + * name tags, rather than full class names, thereby vastly increasing the serialization + * performance in many cases. + * + * @param type The class of the type to register. + */ + public static void registerType(Class<?> type) { + if (type == null) { + throw new NullPointerException("Cannot register null type class."); + } + + synchronized (KryoSerializer.class) { + staticRegisteredTypes.add(type); + } } // -------------------------------------------------------------------------------------------- - // for testing + // For testing // -------------------------------------------------------------------------------------------- Kryo getKryo() { http://git-wip-us.apache.org/repos/asf/flink/blob/a0b6af20/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index 565b9b1..391986d 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -127,16 +127,27 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { * Registers the given Serializer as a default serializer for the given class at the * [[KryoSerializer]]. */ - def addDefaultKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = { - javaEnv.addDefaultKryoSerializer(clazz, serializer) + def registerKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = { + javaEnv.registerKryoSerializer(clazz, serializer) } /** * Registers the given Serializer as a default serializer for the given class at the * [[KryoSerializer]] */ - def addDefaultKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]) { - javaEnv.addDefaultKryoSerializer(clazz, serializer) + def registerKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]) { + javaEnv.registerKryoSerializer(clazz, serializer) + } + + /** + * Registers the given type with the serialization stack. If the type is eventually + * serialized as a POJO, then the type is registered with the POJO serializer. If the + * type ends up being serialized with Kryo, then it will be registered at Kryo to make + * sure that only tags are written. + * + */ + def registerType(typeClass: Class[_]) { + javaEnv.registerType(typeClass) } /**