[hotfix] [avro] Improve Avro type hierarchy checks in AvroKryoSerializerUtils
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6dd5e2b2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6dd5e2b2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6dd5e2b2 Branch: refs/heads/master Commit: 6dd5e2b272f717f6d97bb8dbf5f28c59d8c90151 Parents: a042ba9 Author: Stephan Ewen <[email protected]> Authored: Thu Nov 2 21:03:45 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Nov 3 16:40:35 2017 +0100 ---------------------------------------------------------------------- .../flink/formats/avro/utils/AvroKryoSerializerUtils.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6dd5e2b2/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java index c28f6cf..5744abc 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java @@ -37,8 +37,6 @@ import org.apache.avro.generic.GenericData; import java.io.Serializable; import java.util.LinkedHashMap; -import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass; - /** * Utilities for integrating Avro serializers in Kryo. */ @@ -46,7 +44,9 @@ public class AvroKryoSerializerUtils extends AvroUtils { @Override public void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type) { - if (hasSuperclass(type, AVRO_SPECIFIC_RECORD_BASE) || hasSuperclass(type, AVRO_GENERIC_RECORD)) { + if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(type) || + org.apache.avro.generic.GenericData.Record.class.isAssignableFrom(type)) { + // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type // because Kryo is not able to serialize them properly, we use this serializer for them reg.registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
