Repository: flink Updated Branches: refs/heads/release-0.8 bab2d4992 -> 10f89c73b
[FLINK-1391] Register common Avro types at Kryo Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10f89c73 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10f89c73 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10f89c73 Branch: refs/heads/release-0.8 Commit: 10f89c73b708812727848010b99112fc67fa7e47 Parents: e7c9996 Author: Robert Metzger <rmetz...@apache.org> Authored: Thu Feb 12 12:32:27 2015 +0100 Committer: Robert Metzger <rmetz...@apache.org> Committed: Thu Feb 12 14:46:46 2015 +0100 ---------------------------------------------------------------------- .../flink/api/java/typeutils/runtime/KryoSerializer.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/10f89c73/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index b73f0b1..87ad0cf 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -33,6 +33,7 @@ import com.twitter.chill.thrift.TBaseSerializer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -232,8 +233,13 @@ public class KryoSerializer<T> extends TypeSerializer<T> { if(SpecificRecordBase.class.isAssignableFrom(type)) { ClassTag<SpecificRecordBase> tag = scala.reflect.ClassTag$.MODULE$.apply(type); this.kryo.register(type, com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag)); - } + // register Avro types. + this.kryo.register(Utf8.class); + this.kryo.register(GenericData.EnumSymbol.class); + this.kryo.register(GenericData.Fixed.class); + this.kryo.register(GenericData.StringType.class); + // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type // because Kryo is not able to serialize them properly, we use this serializer for them this.kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializer(ArrayList.class));