Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/4943#discussion_r148924339
--- Diff:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
---
@@ -18,118 +18,111 @@
package org.apache.flink.formats.avro.typeutils;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
import
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.formats.avro.utils.DataInputDecoder;
import org.apache.flink.formats.avro.utils.DataOutputEncoder;
import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.avro.generic.GenericData;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
+import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
+import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.objenesis.strategy.StdInstantiatorStrategy;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * General purpose serialization. Currently using Apache Avro's
Reflect-serializers for serialization and
- * Kryo for deep object copies. We want to change this to Kryo-only.
+ * A serializer that serializes types via Avro.
*
- * @param <T> The type serialized.
+ * <p>The serializer supports both efficient specific record serialization
via for
--- End diff --
nit: "via for"
---