[FLINK-1399] Fix kryo registration of types and make sure that tags are 
assigned for registered types.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0b6af20
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0b6af20
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0b6af20

Branch: refs/heads/master
Commit: a0b6af20beed6d12d0f33f6f58f323b20cc12961
Parents: 2d4ed15
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jan 18 13:04:07 2015 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jan 18 13:59:39 2015 -0800

----------------------------------------------------------------------
 .../flink/api/java/ExecutionEnvironment.java    |  50 ++++++-
 .../java/typeutils/runtime/KryoSerializer.java  | 147 ++++++++++++++++---
 .../flink/api/scala/ExecutionEnvironment.scala  |  19 ++-
 3 files changed, 184 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a0b6af20/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 1753af8..77fed97 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.UUID;
 
 import com.esotericsoftware.kryo.Serializer;
+
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -201,20 +202,57 @@ public abstract class ExecutionEnvironment {
                return this.executionId.toString();
        }
 
+       // 
--------------------------------------------------------------------------------------------
+       //  Registry for types and serializers
+       // 
--------------------------------------------------------------------------------------------
+       
        /**
-        * Registers the given Serializer as a default serializer for the given 
class at the
+        * Registers the given Serializer as a default serializer for the given 
type at the
         * {@link org.apache.flink.api.java.typeutils.runtime.KryoSerializer}.
+        * 
+        * Note that the serializer instance must be serializable (as defined 
by java.io.Serializable),
+        * because it may be distributed to the worker nodes by java 
serialization.
+        * 
+        * @param type The class of the types serialized with the given 
serializer.
+        * @param serializer The serializer to use.
         */
-       public void addDefaultKryoSerializer(Class<?> clazz, Serializer<?> 
serializer) {
-               KryoSerializer.addDefaultSerializer(clazz, serializer);
+       public void registerKryoSerializer(Class<?> type, Serializer<?> 
serializer) {
+               if (type == null || serializer == null) {
+                       throw new NullPointerException("Cannot register null 
class or serializer.");
+               }
+               
+               KryoSerializer.registerSerializer(type, serializer);
        }
 
        /**
-        * Registers the given Serializer as a default serializer for the given 
class at the
+        * Registers the given Serializer via its class as a serializer for the 
given type at the
         * {@link org.apache.flink.api.java.typeutils.runtime.KryoSerializer}.
+        * 
+        * @param type The class of the types serialized with the given 
serializer.
+        * @param serializerClass The class of the serializer to use.
         */
-       public void addDefaultKryoSerializer(Class<?> clazz, Class<? extends 
Serializer<?>> serializer) {
-               KryoSerializer.addDefaultSerializer(clazz, serializer);
+       public void registerKryoSerializer(Class<?> type, Class<? extends 
Serializer<?>> serializerClass) {
+               if (type == null || serializerClass == null) {
+                       throw new NullPointerException("Cannot register null 
class or serializer.");
+               }
+               
+               KryoSerializer.registerSerializer(type, serializerClass);
+       }
+       
+       /**
+        * Registers the given type with the serialization stack. If the type 
is eventually
+        * serialized as a POJO, then the type is registered with the POJO 
serializer. If the
+        * type ends up being serialized with Kryo, then it will be registered 
at Kryo to make
+        * sure that only tags are written.
+        *  
+        * @param type The class of the type to register.
+        */
+       public void registerType(Class<?> type) {
+               if (type == null) {
+                       throw new NullPointerException("Cannot register null 
type class.");
+               }
+               
+               KryoSerializer.registerType(type);
        }
        
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a0b6af20/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index 0a5c4d9..bec5d59 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
@@ -34,19 +35,40 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
-
+import java.util.Set;
+
+/**
+ * A type serializer that serializes its type using the Kryo serialization
+ * framework (https://github.com/EsotericSoftware/kryo).
+ * 
+ * This serializer is intended as a fallback serializer for the cases that are
+ * not covered by the basic types, tuples, and POJOs.
+ *
+ * @param <T> The type to be serialized.
+ */
 public class KryoSerializer<T> extends TypeSerializer<T> {
+       
        private static final long serialVersionUID = 3L;
 
        private static Map<Class<?>, Serializer<?>> staticRegisteredSerializers 
= new HashMap<Class<?>, Serializer<?>>();
        private static Map<Class<?>, Class<? extends Serializer<?>>> 
staticRegisteredSerializersClasses = new HashMap<Class<?>, Class<? extends 
Serializer<?>>>();
-
-       private static Map<Class<?>, Serializer<?>> registeredSerializers;
-       private static Map<Class<?>, Class<? extends Serializer<?>>> 
registeredSerializersClasses;
+       
+       private static Set<Class<?>> staticRegisteredTypes = new 
HashSet<Class<?>>();
+       
+       // 
------------------------------------------------------------------------
+       
+       private final Map<Class<?>, Serializer<?>> registeredSerializers;
+       private final Map<Class<?>, Class<? extends Serializer<?>>> 
registeredSerializersClasses;
+       private final Set<Class<?>> registeredTypes;
 
        private final Class<T> type;
+       
+       // 
------------------------------------------------------------------------
+       // The fields below are lazily initialized after de-serialization
 
        private transient Kryo kryo;
        private transient T copyInstance;
@@ -56,6 +78,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
        
        private transient Input input;
        private transient Output output;
+       
+       // 
------------------------------------------------------------------------
 
        public KryoSerializer(Class<T> type){
                if(type == null){
@@ -63,10 +87,26 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                }
                this.type = type;
 
-               this.registeredSerializers = staticRegisteredSerializers;
-               this.registeredSerializersClasses = 
staticRegisteredSerializersClasses;
+               // create copies of the statically registered serializers
+               // we use static synchronization to safeguard against 
concurrent use
+               // of the static collections.
+               synchronized (KryoSerializer.class) {
+                       this.registeredSerializers = 
staticRegisteredSerializers.isEmpty() ?
+                               Collections.<Class<?>, Serializer<?>>emptyMap() 
:
+                               new HashMap<Class<?>, 
Serializer<?>>(staticRegisteredSerializers);
+               
+                       this.registeredSerializersClasses = 
staticRegisteredSerializersClasses.isEmpty() ?
+                               Collections.<Class<?>, Class<? extends 
Serializer<?>>>emptyMap() :
+                               new HashMap<Class<?>, Class<? extends 
Serializer<?>>>(staticRegisteredSerializersClasses);
+                               
+                       this.registeredTypes = staticRegisteredTypes.isEmpty() ?
+                               Collections.<Class<?>>emptySet() :
+                               new HashSet<Class<?>>(staticRegisteredTypes);
+               }
+               
        }
 
+       // 
------------------------------------------------------------------------
 
        @Override
        public boolean isImmutableType() {
@@ -193,41 +233,104 @@ public class KryoSerializer<T> extends TypeSerializer<T> 
{
        private void checkKryoInitialized() {
                if (this.kryo == null) {
                        this.kryo = new ScalaKryoInstantiator().newKryo();
-                       this.kryo.addDefaultSerializer(Throwable.class, new 
JavaSerializer());
 
-                       for (Map.Entry<Class<?>, Serializer<?>> e: 
registeredSerializers.entrySet()) {
-                               this.kryo.addDefaultSerializer(e.getKey(), 
e.getValue());
-                       }
+                       // Throwable and all subclasses should be serialized 
via java serialization
+                       kryo.addDefaultSerializer(Throwable.class, new 
JavaSerializer());
 
-                       for (Map.Entry<Class<?>, Class<? extends 
Serializer<?>>> e: registeredSerializersClasses.entrySet()) {
-                               this.kryo.addDefaultSerializer(e.getKey(), 
e.getValue());
+                       // register the type of our class
+                       kryo.register(type);
+                       
+                       // register given types. we do this first so that any 
registration of a
+                       // more specific serializer overrides this
+                       for (Class<?> type : registeredTypes) {
+                               kryo.register(type);
+                       }
+                       
+                       // register given serializer classes
+                       for (Map.Entry<Class<?>, Class<? extends 
Serializer<?>>> e : registeredSerializersClasses.entrySet()) {
+                               Class<?> typeClass = e.getKey();
+                               Class<? extends Serializer<?>> serializerClass 
= e.getValue();
+                               
+                               Serializer<?> serializer = 
+                                               
ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass);
+                               kryo.register(typeClass, serializer);
                        }
 
-                       this.kryo.setRegistrationRequired(false);
-                       this.kryo.register(type);
-
-                       
this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+                       // register given serializers
+                       for (Map.Entry<Class<?>, Serializer<?>> e : 
registeredSerializers.entrySet()) {
+                               kryo.register(e.getKey(), e.getValue());
+                       }
+                       
+                       kryo.setRegistrationRequired(false);
+                       
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
                }
        }
+       
+       // 
--------------------------------------------------------------------------------------------
+       // For registering custom serializers and types
+       // 
--------------------------------------------------------------------------------------------
 
        /**
         * Registers the given Serializer as a default serializer for the given 
class at the Kryo
         * instance.
+        * Note that the serializer instance must be serializable (as defined 
by java.io.Serializable),
+        * because it may be distributed to the worker nodes by java 
serialization.
+        * 
+        * @param clazz The class of the types serialized with the given 
serializer.
+        * @param serializer The serializer to use.
+        * @throws IllegalArgumentException Thrown, if the serializer is not 
serializable.
         */
-       public static void addDefaultSerializer(Class<?> clazz, Serializer<?> 
serializer) {
-               staticRegisteredSerializers.put(clazz, serializer);
+       public static void registerSerializer(Class<?> clazz, Serializer<?> 
serializer) {
+               if (clazz == null || serializer == null) {
+                       throw new NullPointerException("Cannot register null 
class or serializer.");
+               }
+               if (!(serializer instanceof java.io.Serializable)) {
+                       throw new IllegalArgumentException("The serializer 
instance must be serializable, (for distributing it in the cluster), "
+                                       + "as defined by java.io.Serializable. 
For stateless serializers, you can use the "
+                                       + "'registerSerializer(Class, Class)' 
method to register the serializer via its class.");
+               }
+               
+               synchronized (KryoSerializer.class) {
+                       staticRegisteredSerializers.put(clazz, serializer);
+               }
        }
 
        /**
-        * Registers the given Serializer as a default serializer for the given 
class at the Kryo
+        * Registers a serializer via its class as a default serializer for the 
given class at the Kryo
         * instance.
+        * 
+        * @param clazz The class of the types serialized with the given 
serializer.
+        * @param serializerClass The serializer to use.
         */
-       public static void addDefaultSerializer(Class<?> clazz, Class<? extends 
Serializer<?>> serializer) {
-               staticRegisteredSerializersClasses.put(clazz, serializer);
+       public static void registerSerializer(Class<?> clazz, Class<? extends 
Serializer<?>> serializerClass) {
+               if (clazz == null || serializerClass == null) {
+                       throw new NullPointerException("Cannot register null 
class or serializer.");
+               }
+               
+               synchronized (KryoSerializer.class) {
+                       staticRegisteredSerializersClasses.put(clazz, 
serializerClass);
+               }
+       }
+       
+       /**
+        * Registers the given type with Kryo. Registering the type allows Kryo 
to write abbreviated
+        * name tags, rather than full class names, thereby vastly increasing 
the serialization
+        * performance in many cases.
+        *  
+        * @param type The class of the type to register.
+        */
+       public static void registerType(Class<?> type) {
+               if (type == null) {
+                       throw new NullPointerException("Cannot register null 
type class.");
+               }
+               
+               synchronized (KryoSerializer.class) {
+                       staticRegisteredTypes.add(type);
+               }
        }
 
        // 
--------------------------------------------------------------------------------------------
-       // for testing
+       // For testing
        // 
--------------------------------------------------------------------------------------------
        
        Kryo getKryo() {

http://git-wip-us.apache.org/repos/asf/flink/blob/a0b6af20/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 565b9b1..391986d 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -127,16 +127,27 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * Registers the given Serializer as a default serializer for the given 
class at the
    * [[KryoSerializer]].
    */
-  def addDefaultKryoSerializer(clazz: Class[_], serializer: Serializer[_]): 
Unit = {
-    javaEnv.addDefaultKryoSerializer(clazz, serializer)
+  def registerKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit 
= {
+    javaEnv.registerKryoSerializer(clazz, serializer)
   }
 
   /**
    * Registers the given Serializer as a default serializer for the given 
class at the
    * [[KryoSerializer]]
    */
-  def addDefaultKryoSerializer(clazz: Class[_], serializer: Class[_ <: 
Serializer[_]]) {
-    javaEnv.addDefaultKryoSerializer(clazz, serializer)
+  def registerKryoSerializer(clazz: Class[_], serializer: Class[_ <: 
Serializer[_]]) {
+    javaEnv.registerKryoSerializer(clazz, serializer)
+  }
+  
+  /**
+   * Registers the given type with the serialization stack. If the type is 
eventually
+   * serialized as a POJO, then the type is registered with the POJO 
serializer. If the
+   * type ends up being serialized with Kryo, then it will be registered at 
Kryo to make
+   * sure that only tags are written.
+   * 
+   */
+  def registerType(typeClass: Class[_]) {
+    javaEnv.registerType(typeClass)
   }
 
   /**

Reply via email to