Repository: flink
Updated Branches:
  refs/heads/master 16b088218 -> 87bf57816


[FLINK-7420] [avro] Abstract all Avro interaction behind AvroUtils

Before, we would try and dynamicall load Avro-related classes in several
places. Now, we only reflectively instantiate the right AvroUtils and
all other operations are methods on this.

The default AvroUtils throw exceptions with a helpful message for most
operations.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65e87045
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65e87045
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65e87045

Branch: refs/heads/master
Commit: 65e87045c48ce3200ea6690d945ed87b061808af
Parents: 29249b2
Author: Aljoscha Krettek <[email protected]>
Authored: Mon Oct 30 15:02:18 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Fri Nov 3 16:40:34 2017 +0100

----------------------------------------------------------------------
 .../flink/api/java/typeutils/AvroUtils.java     | 120 +++++++++++++++++++
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  23 +---
 .../flink/api/java/typeutils/TypeExtractor.java |  33 +----
 .../typeutils/runtime/kryo/KryoSerializer.java  |   3 +-
 .../typeutils/runtime/kryo/Serializers.java     |  63 ++--------
 .../avro/utils/AvroKryoSerializerUtils.java     |  54 +++++++--
 6 files changed, 182 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/65e87045/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java
new file mode 100644
index 0000000..e19f5fb
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
+import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+
+import java.util.LinkedHashMap;
+
+import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
+
+/**
+ * Utility methods for dealing with Avro types. This has a default 
implementation for the case that
+ * Avro is not present on the classpath and an actual implementation in 
flink-avro that is
+ * dynamically loaded when present.
+ */
+public abstract class AvroUtils {
+
+       protected static final String AVRO_SPECIFIC_RECORD_BASE = 
"org.apache.avro.specific.SpecificRecordBase";
+
+       protected static final String AVRO_GENERIC_RECORD = 
"org.apache.avro.generic.GenericData$Record";
+
+       private static final String AVRO_KRYO_UTILS = 
"org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils";
+
+       private static final String AVRO_GENERIC_DATA_ARRAY = 
"org.apache.avro.generic.GenericData$Array";
+
+       /**
+        * Loads the utility class from <code>flink-avro</code> and adds 
Avro-specific serializers. This
+        * method will throw an exception if we see an Avro type but flink-avro 
is not in the classpath.
+        */
+       public abstract void addAvroSerializersIfRequired(ExecutionConfig reg, 
Class<?> type);
+
+       /**
+        * Registers a special Serializer for GenericData.Array.
+        */
+       public abstract void addAvroGenericDataArrayRegistration(
+                       LinkedHashMap<String,
+                       KryoRegistration> kryoRegistrations);
+
+       /**
+        * Creates an {@code AvroSerializer} if flink-avro is present, 
otherwise throws an exception.
+        */
+       public abstract <T> TypeSerializer<T> createAvroSerializer(Class<T> 
type);
+
+       /**
+        * Creates an {@code AvroTypeInfo} if flink-avro is present, otherwise 
throws an exception.
+        */
+       public abstract <T> TypeInformation<T> createAvroTypeInfo(Class<T> 
type);
+
+       /**
+        * Returns either the default {@link AvroUtils} which throw an 
exception in cases where Avro
+        * would be needed or loads the specific utils for Avro from flink-avro.
+        */
+       public static AvroUtils getAvroUtils() {
+
+               // try and load the special AvroUtils from the flink-avro 
package
+               Class<?> clazz;
+               try {
+                       clazz = Class.forName(AVRO_KRYO_UTILS, false, 
AvroUtils.class.getClassLoader());
+               } catch (ClassNotFoundException e) {
+                       // cannot find the utils, return the default 
implementation
+                       return new DefaultAvroUtils();
+               }
+
+               try {
+                       return (AvroUtils) clazz.getConstructor().newInstance();
+               } catch (Exception e) {
+                       throw new RuntimeException("Could not instantiate " + 
AVRO_KRYO_UTILS + ".");
+               }
+       }
+
+       private static class DefaultAvroUtils extends AvroUtils {
+               @Override
+               public void addAvroSerializersIfRequired(ExecutionConfig reg, 
Class<?> type) {
+                       if (hasSuperclass(type, AVRO_SPECIFIC_RECORD_BASE) || 
hasSuperclass(
+                               type,
+                               AVRO_GENERIC_RECORD)) {
+                               throw new RuntimeException("Could not load 
class '" + AVRO_KRYO_UTILS + "'. " +
+                                       "You may be missing the 'flink-avro' 
dependency.");
+                       }
+               }
+
+               @Override
+               public void 
addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> 
kryoRegistrations) {
+                       kryoRegistrations.put(AVRO_GENERIC_DATA_ARRAY,
+                               new 
KryoRegistration(Serializers.DummyAvroRegisteredClass.class, (Class) 
Serializers.DummyAvroKryoSerializerClass.class));
+               }
+
+               @Override
+               public <T> TypeSerializer<T> createAvroSerializer(Class<T> 
type) {
+                       throw new RuntimeException("Could not load the 
AvroSerializer class. " +
+                               "You may be missing the 'flink-avro' 
dependency.");
+               }
+
+               @Override
+               public <T> TypeInformation<T> createAvroTypeInfo(Class<T> type) 
{
+                       throw new RuntimeException("Could not load the 
AvroTypeInfo class. " +
+                               "You may be missing the 'flink-avro' 
dependency.");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65e87045/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index b24f425..2e893bb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -31,9 +31,7 @@ import 
org.apache.flink.api.java.typeutils.runtime.PojoComparator;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 
-import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -303,27 +301,12 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
        @PublicEvolving
        @SuppressWarnings("unchecked")
        public TypeSerializer<T> createSerializer(ExecutionConfig config) {
-               if(config.isForceKryoEnabled()) {
+               if (config.isForceKryoEnabled()) {
                        return new KryoSerializer<>(getTypeClass(), config);
                }
 
-               if(config.isForceAvroEnabled()) {
-                       Class<?> clazz;
-                       try {
-                               clazz = 
Class.forName("org.apache.flink.formats.avro.typeutils.AvroSerializer");
-                       } catch (ClassNotFoundException e) {
-                               throw new RuntimeException("Could not load the 
AvroSerializer class. " +
-                                       "You may be missing the 'flink-avro' 
dependency.");
-                       }
-
-                       try {
-                               Constructor<?> constructor = 
clazz.getConstructor(Class.class);
-                               return (TypeSerializer<T>) 
constructor.newInstance(getTypeClass());
-                       } catch (NoSuchMethodException | IllegalAccessException 
| InstantiationException e) {
-                               throw new RuntimeException("Incompatible 
versions of the Avro classes found.");
-                       } catch (InvocationTargetException e) {
-                               throw new RuntimeException("Cannot create 
AvroSerializer.", e.getTargetException());
-                       }
+               if (config.isForceAvroEnabled()) {
+                       return 
AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
                }
 
                TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[fields.length];

http://git-wip-us.apache.org/repos/asf/flink/blob/65e87045/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 1a9cecb..8ea2e1a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -117,8 +117,6 @@ public class TypeExtractor {
 
        private static final String AVRO_SPECIFIC_RECORD_BASE_CLASS = 
"org.apache.avro.specific.SpecificRecordBase";
 
-       private static final String AVRO_TYPEINFO_CLASS = 
"org.apache.flink.formats.avro.typeutils.AvroTypeInfo";
-
        private static final Logger LOG = 
LoggerFactory.getLogger(TypeExtractor.class);
 
        public static final int[] NO_INDEX = new int[] {};
@@ -1794,7 +1792,7 @@ public class TypeExtractor {
 
                // special case for POJOs generated by Avro.
                if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
-                       return createAvroTypeInfo(clazz);
+                       return 
AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
                }
 
                if (Modifier.isInterface(clazz.getModifiers())) {
@@ -2175,33 +2173,4 @@ public class TypeExtractor {
                        // ignore
                }
        }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities to handle Avro's 'SpecificRecord' type via reflection
-       // 
------------------------------------------------------------------------
-
-       private static <T> TypeInformation<T> createAvroTypeInfo(Class<T> 
clazz) {
-               Class<?> typeInfoClass;
-               try {
-                       typeInfoClass = Class.forName(AVRO_TYPEINFO_CLASS, 
false, TypeExtractor.class.getClassLoader());
-               }
-               catch (ClassNotFoundException e) {
-                       throw new RuntimeException("Could not load the 
TypeInformation for the class '"
-                                       + AVRO_TYPEINFO_CLASS + "'. You may be 
missing the 'flink-avro' dependency.");
-               }
-
-               try {
-                       Constructor<?> constr = 
typeInfoClass.getConstructor(Class.class);
-
-                       @SuppressWarnings("unchecked")
-                       TypeInformation<T> typeInfo = (TypeInformation<T>) 
constr.newInstance(clazz);
-                       return typeInfo;
-               }
-               catch (NoSuchMethodException | IllegalAccessException | 
InstantiationException e) {
-                       throw new RuntimeException("Incompatible versions of 
the Avro classes found.");
-               }
-               catch (InvocationTargetException e) {
-                       throw new RuntimeException("Cannot create 
AvroTypeInfo.", e.getTargetException());
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/65e87045/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 269cf35..560e5b1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.AvroUtils;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
@@ -477,7 +478,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                }
 
                // add Avro support if flink-avro is available; a dummy 
otherwise
-               
Serializers.addAvroGenericDataArrayRegistration(kryoRegistrations);
+               
AvroUtils.getAvroUtils().addAvroGenericDataArrayRegistration(kryoRegistrations);
 
                return kryoRegistrations;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/65e87045/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
index de7b2fc..9659dc6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
@@ -22,10 +22,10 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.AvroUtils;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
-import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
@@ -36,19 +36,14 @@ import 
com.esotericsoftware.kryo.serializers.CollectionSerializer;
 import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Modifier;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Set;
 
-import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
-
-
 /**
  * Class containing utilities for the serializers of the Flink Runtime.
  *
@@ -61,14 +56,6 @@ import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSupercl
 @Internal
 public class Serializers {
 
-       private static final String AVRO_SPECIFIC_RECORD_BASE = 
"org.apache.avro.specific.SpecificRecordBase";
-
-       private static final String AVRO_GENERIC_RECORD = 
"org.apache.avro.generic.GenericData$Record";
-
-       private static final String AVRO_KRYO_UTILS = 
"org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils";
-
-       private static final String AVRO_GENERIC_DATA_ARRAY = 
"org.apache.avro.generic.GenericData$Array";
-
        public static void recursivelyRegisterType(TypeInformation<?> typeInfo, 
ExecutionConfig config, Set<Class<?>> alreadySeen) {
                if (typeInfo instanceof GenericTypeInfo) {
                        GenericTypeInfo<?> genericTypeInfo = 
(GenericTypeInfo<?>) typeInfo;
@@ -104,9 +91,7 @@ public class Serializers {
                else {
                        config.registerKryoType(type);
                        // add serializers for Avro type if necessary
-                       if (hasSuperclass(type, AVRO_SPECIFIC_RECORD_BASE) || 
hasSuperclass(type, AVRO_GENERIC_RECORD)) {
-                               addAvroSerializers(config, type);
-                       }
+                       
AvroUtils.getAvroUtils().addAvroSerializersIfRequired(config, type);
 
                        Field[] fields = type.getDeclaredFields();
                        for (Field field : fields) {
@@ -161,43 +146,19 @@ public class Serializers {
        }
 
        /**
-        * Loads the utility class from <code>flink-avro</code> and adds 
Avro-specific serializers.
+        * This is used in case we don't have Avro on the classpath. Flink 
versions before 1.4
+        * always registered special Serializers for Kryo but starting with 
Flink 1.4 we don't have
+        * Avro on the classpath by default anymore. We still have to retain 
the same registered
+        * Serializers for backwards compatibility of savepoints.
         */
-       private static void addAvroSerializers(ExecutionConfig reg, Class<?> 
type) {
-               Class<?> clazz;
-               try {
-                       clazz = Class.forName(AVRO_KRYO_UTILS, false, 
Serializers.class.getClassLoader());
-               }
-               catch (ClassNotFoundException e) {
-                       throw new RuntimeException("Could not load class '" + 
AVRO_KRYO_UTILS + "'. " +
-                               "You may be missing the 'flink-avro' 
dependency.");
-               }
-               try {
-                       clazz.getDeclaredMethod("addAvroSerializers", 
ExecutionConfig.class, Class.class).invoke(null, reg, type);
-               } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
-                       throw new RuntimeException("Could not access method in 
'flink-avro' dependency.", e);
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       public static void 
addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> 
kryoRegistrations) {
-               try {
-                       Class<?> clazz = Class.forName(AVRO_GENERIC_DATA_ARRAY, 
false, Serializers.class.getClassLoader());
-
-                       kryoRegistrations.put(
-                               AVRO_GENERIC_DATA_ARRAY,
-                               new KryoRegistration(
-                                               clazz,
-                                               new 
ExecutionConfig.SerializableSerializer<>(new 
Serializers.SpecificInstanceCollectionSerializerForArrayList())));
-               }
-               catch (ClassNotFoundException e) {
-                       kryoRegistrations.put(AVRO_GENERIC_DATA_ARRAY,
-                               new 
KryoRegistration(DummyAvroRegisteredClass.class, (Class) 
DummyAvroKryoSerializerClass.class));
-               }
-       }
-
        public static class DummyAvroRegisteredClass {}
 
+       /**
+        * This is used in case we don't have Avro on the classpath. Flink 
versions before 1.4
+        * always registered special Serializers for Kryo but starting with 
Flink 1.4 we don't have
+        * Avro on the classpath by default anymore. We still have to retain 
the same registered
+        * Serializers for backwards compatibility of savepoints.
+        */
        public static class DummyAvroKryoSerializerClass<T> extends 
Serializer<T> {
                @Override
                public void write(Kryo kryo, Output output, Object o) {

http://git-wip-us.apache.org/repos/asf/flink/blob/65e87045/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
index 7305f23..c28f6cf 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
@@ -19,7 +19,13 @@
 package org.apache.flink.formats.avro.utils;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.AvroUtils;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.formats.avro.typeutils.AvroSerializer;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
@@ -29,22 +35,50 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 
 import java.io.Serializable;
+import java.util.LinkedHashMap;
+
+import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
 
 /**
  * Utilities for integrating Avro serializers in Kryo.
  */
-public class AvroKryoSerializerUtils {
+public class AvroKryoSerializerUtils extends AvroUtils {
+
+       @Override
+       public void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> 
type) {
+               if (hasSuperclass(type, AVRO_SPECIFIC_RECORD_BASE) || 
hasSuperclass(type, AVRO_GENERIC_RECORD)) {
+                       // Avro POJOs contain java.util.List which have 
GenericData.Array as their runtime type
+                       // because Kryo is not able to serialize them properly, 
we use this serializer for them
+                       
reg.registerTypeWithKryoSerializer(GenericData.Array.class, 
Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
+
+                       // We register this serializer for users who want to 
use untyped Avro records (GenericData.Record).
+                       // Kryo is able to serialize everything in there, 
except for the Schema.
+                       // This serializer is very slow, but using the 
GenericData.Records of Kryo is in general a bad idea.
+                       // we add the serializer as a default serializer 
because Avro is using a private sub-type at runtime.
+                       reg.addDefaultKryoSerializer(Schema.class, 
AvroSchemaSerializer.class);
+               }
+       }
+
+       @Override
+       public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, 
KryoRegistration> kryoRegistrations) {
+               kryoRegistrations.put(
+                       GenericData.Array.class.getName(),
+                       new KryoRegistration(
+                               GenericData.Array.class,
+                               new 
ExecutionConfig.SerializableSerializer<>(new 
Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+       }
 
-       public static void addAvroSerializers(ExecutionConfig reg, Class<?> 
type) {
-               // Avro POJOs contain java.util.List which have 
GenericData.Array as their runtime type
-               // because Kryo is not able to serialize them properly, we use 
this serializer for them
-               reg.registerTypeWithKryoSerializer(GenericData.Array.class, 
Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
+       @Override
+       public <T> TypeSerializer<T> createAvroSerializer(Class<T> type) {
+               return new AvroSerializer<>(type);
+       }
 
-               // We register this serializer for users who want to use 
untyped Avro records (GenericData.Record).
-               // Kryo is able to serialize everything in there, except for 
the Schema.
-               // This serializer is very slow, but using the 
GenericData.Records of Kryo is in general a bad idea.
-               // we add the serializer as a default serializer because Avro 
is using a private sub-type at runtime.
-               reg.addDefaultKryoSerializer(Schema.class, 
AvroSchemaSerializer.class);
+       @Override
+       @SuppressWarnings({"rawtypes", "unchecked"})
+       public <T> TypeInformation<T> createAvroTypeInfo(Class<T> type) {
+               // we have to be raw here because we cannot have "<T extends 
SpecificRecordBase>" in
+               // the interface of AvroUtils
+               return new AvroTypeInfo(type);
        }
 
        /**

Reply via email to