Repository: flink Updated Branches: refs/heads/master 88737cf9f -> c85f15ead
[FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State This falls back to the original serializer (Pojo / Kryo) in cases where an old snapshot is resumed. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3a2197a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3a2197a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3a2197a Branch: refs/heads/master Commit: f3a2197a23524048200ae2b4712d6ed833208124 Parents: 88737cf Author: Stephan Ewen <[email protected]> Authored: Fri Nov 3 14:47:33 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Mon Nov 6 18:56:47 2017 +0100 ---------------------------------------------------------------------- .../flink/api/java/typeutils/PojoTypeInfo.java | 6 +- .../flink/api/java/typeutils/AvroTypeInfo.java | 2 +- .../java/typeutils/runtime/AvroSerializer.java | 337 ++++++++++++++++ .../formats/avro/typeutils/AvroSerializer.java | 382 +++++++++++-------- .../formats/avro/typeutils/AvroTypeInfo.java | 91 +++-- .../BackwardsCompatibleAvroSerializer.java | 218 +++++++++++ .../avro/typeutils/AvroSerializerTest.java | 59 +++ .../avro/typeutils/AvroTypeExtractionTest.java | 14 +- .../avro/typeutils/AvroTypeInfoTest.java | 12 + .../BackwardsCompatibleAvroSerializerTest.java | 167 ++++++++ .../formats/avro/utils/TestDataGenerator.java | 120 ++++++ .../flink-1.3-avro-type-serialized-data | Bin 0 -> 23926 bytes .../flink-1.3-avro-type-serializer-snapshot | Bin 0 -> 48089 bytes 13 files changed, 1208 insertions(+), 200 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java index 2e893bb..211b7ef 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -309,6 +309,10 @@ public class PojoTypeInfo<T> extends CompositeType<T> { return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass()); } + return createPojoSerializer(config); + } + + public PojoSerializer<T> createPojoSerializer(ExecutionConfig config) { TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length]; Field[] reflectiveFields = new Field[fields.length]; @@ -319,7 +323,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> { return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config); } - + @Override public boolean equals(Object obj) { if (obj instanceof PojoTypeInfo) { http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java index 58085f6..03bacfa 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java @@ -33,6 +33,6 @@ import static org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateField public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> { public AvroTypeInfo(Class<T> typeClass) { - super(typeClass, generateFieldsFromAvroSchema(typeClass)); + super(typeClass, generateFieldsFromAvroSchema(typeClass, true)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java new file mode 100644 index 0000000..228e672 --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.formats.avro.utils.DataInputDecoder; +import org.apache.flink.formats.avro.utils.DataOutputEncoder; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import com.esotericsoftware.kryo.Kryo; + +import org.apache.avro.generic.GenericData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.util.Utf8; + +import org.objenesis.strategy.StdInstantiatorStrategy; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Old deprecated Avro serializer. It is retained for a smoother experience when + * upgrading from an earlier Flink savepoint that stored this serializer. + */ +@Internal +@Deprecated +@SuppressWarnings({"unused", "deprecation"}) +public final class AvroSerializer<T> extends TypeSerializer<T> { + + private static final long serialVersionUID = 1L; + + private final Class<T> type; + + private final Class<? extends T> typeToInstantiate; + + /** + * Map of class tag (using classname as tag) to their Kryo registration. + * + * <p>This map serves as a preview of the final registration result of + * the Kryo instance, taking into account registration overwrites. + */ + private LinkedHashMap<String, KryoRegistration> kryoRegistrations; + + private transient ReflectDatumWriter<T> writer; + private transient ReflectDatumReader<T> reader; + + private transient DataOutputEncoder encoder; + private transient DataInputDecoder decoder; + + private transient Kryo kryo; + + private transient T deepCopyInstance; + + // -------------------------------------------------------------------------------------------- + + public AvroSerializer(Class<T> type) { + this(type, type); + } + + public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) { + this.type = checkNotNull(type); + this.typeToInstantiate = checkNotNull(typeToInstantiate); + + InstantiationUtil.checkForInstantiation(typeToInstantiate); + + this.kryoRegistrations = buildKryoRegistrations(type); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public AvroSerializer<T> duplicate() { + return new AvroSerializer<>(type, typeToInstantiate); + } + + @Override + public T createInstance() { + return InstantiationUtil.instantiate(this.typeToInstantiate); + } + + @Override + public T copy(T from) { + checkKryoInitialized(); + + return KryoUtils.copy(from, kryo, this); + } + + @Override + public T copy(T from, T reuse) { + checkKryoInitialized(); + + return KryoUtils.copy(from, reuse, kryo, this); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(T value, DataOutputView target) throws IOException { + checkAvroInitialized(); + this.encoder.setOut(target); + this.writer.write(value, this.encoder); + } + + @Override + public T deserialize(DataInputView source) throws IOException { + checkAvroInitialized(); + this.decoder.setIn(source); + return this.reader.read(null, this.decoder); + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + checkAvroInitialized(); + this.decoder.setIn(source); + return this.reader.read(reuse, this.decoder); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + checkAvroInitialized(); + + if (this.deepCopyInstance == null) { + this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); + } + + this.decoder.setIn(source); + this.encoder.setOut(target); + + T tmp = this.reader.read(this.deepCopyInstance, this.decoder); + this.writer.write(tmp, this.encoder); + } + + private void checkAvroInitialized() { + if (this.reader == null) { + this.reader = new ReflectDatumReader<>(type); + this.writer = new ReflectDatumWriter<>(type); + this.encoder = new DataOutputEncoder(); + this.decoder = new DataInputDecoder(); + } + } + + private void checkKryoInitialized() { + if (this.kryo == null) { + this.kryo = new Kryo(); + + Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); + instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.setInstantiatorStrategy(instantiatorStrategy); + + kryo.setAsmEnabled(true); + + KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); + } + } + + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof AvroSerializer) { + @SuppressWarnings("unchecked") + AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj; + + return avroSerializer.canEqual(this) && + type == avroSerializer.type && + typeToInstantiate == avroSerializer.typeToInstantiate; + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof AvroSerializer; + } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public AvroSerializerConfigSnapshot<T> snapshotConfiguration() { + return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations); + } + + @SuppressWarnings("unchecked") + @Override + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSerializerConfigSnapshot) { + final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot; + + if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) { + // resolve Kryo registrations; currently, since the Kryo registrations in Avro + // are fixed, there shouldn't be a problem with the resolution here. + + LinkedHashMap<String, KryoRegistration> oldRegistrations = config.getKryoRegistrations(); + oldRegistrations.putAll(kryoRegistrations); + + for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) { + if (reconfiguredRegistrationEntry.getValue().isDummy()) { + return CompatibilityResult.requiresMigration(); + } + } + + this.kryoRegistrations = oldRegistrations; + return CompatibilityResult.compatible(); + } + } + + // ends up here if the preceding serializer is not + // the ValueSerializer, or serialized data type has changed + return CompatibilityResult.requiresMigration(); + } + + /** + * Config snapshot for this serializer. + */ + public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> { + + private static final int VERSION = 1; + + private Class<? extends T> typeToInstantiate; + + public AvroSerializerConfigSnapshot() {} + + public AvroSerializerConfigSnapshot( + Class<T> baseType, + Class<? extends T> typeToInstantiate, + LinkedHashMap<String, KryoRegistration> kryoRegistrations) { + + super(baseType, kryoRegistrations); + this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate); + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + + out.writeUTF(typeToInstantiate.getName()); + } + + @SuppressWarnings("unchecked") + @Override + public void read(DataInputView in) throws IOException { + super.read(in); + + String classname = in.readUTF(); + try { + typeToInstantiate = (Class<? extends T>) Class.forName(classname, true, getUserCodeClassLoader()); + } catch (ClassNotFoundException e) { + throw new IOException("Cannot find requested class " + classname + " in classpath.", e); + } + } + + @Override + public int getVersion() { + return VERSION; + } + + public Class<? extends T> getTypeToInstantiate() { + return typeToInstantiate; + } + } + + // -------------------------------------------------------------------------------------------- + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + // kryoRegistrations may be null if this Avro serializer is deserialized from an old version + if (kryoRegistrations == null) { + this.kryoRegistrations = buildKryoRegistrations(type); + } + } + + private static <T> LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(Class<T> serializedDataType) { + final LinkedHashMap<String, KryoRegistration> registrations = new LinkedHashMap<>(); + + // register Avro types. + registrations.put( + GenericData.Array.class.getName(), + new KryoRegistration( + GenericData.Array.class, + new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList()))); + registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class)); + registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class)); + registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class)); + registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class)); + + // register the serialized data type + registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType)); + + return registrations; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java index 02f74f5..bc3369f 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java @@ -18,85 +18,93 @@ package org.apache.flink.formats.avro.typeutils; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.KryoRegistration; import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.KryoUtils; -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.formats.avro.utils.DataInputDecoder; import org.apache.flink.formats.avro.utils.DataOutputEncoder; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Preconditions; -import com.esotericsoftware.kryo.Kryo; -import org.apache.avro.generic.GenericData; +import org.apache.avro.Schema; +import org.apache.avro.SchemaCompatibility; +import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType; +import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.util.Utf8; -import org.objenesis.strategy.StdInstantiatorStrategy; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; import java.io.IOException; -import java.io.ObjectInputStream; -import java.util.LinkedHashMap; -import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and - * Kryo for deep object copies. We want to change this to Kryo-only. + * A serializer that serializes types via Avro. * - * @param <T> The type serialized. + * <p>The serializer supports both efficient specific record serialization for + * types generated via Avro, as well as serialization via reflection + * (ReflectDatumReader / -Writer). The serializer instantiates them depending on + * the class of the type it should serialize. + * + * @param <T> The type to be serialized. */ -@Internal -public final class AvroSerializer<T> extends TypeSerializer<T> { +public class AvroSerializer<T> extends TypeSerializer<T> { private static final long serialVersionUID = 1L; - private final Class<T> type; + // -------- configuration fields, serializable ----------- - private final Class<? extends T> typeToInstantiate; + /** The class of the type that is serialized by this serializer. */ + private final Class<T> type; - /** - * Map of class tag (using classname as tag) to their Kryo registration. - * - * <p>This map serves as a preview of the final registration result of - * the Kryo instance, taking into account registration overwrites. - */ - private LinkedHashMap<String, KryoRegistration> kryoRegistrations; + // -------- runtime fields, non-serializable, lazily initialized ----------- - private transient ReflectDatumWriter<T> writer; - private transient ReflectDatumReader<T> reader; + private transient SpecificDatumWriter<T> writer; + private transient SpecificDatumReader<T> reader; private transient DataOutputEncoder encoder; private transient DataInputDecoder decoder; - private transient Kryo kryo; + private transient SpecificData avroData; - private transient T deepCopyInstance; + private transient Schema schema; - // -------------------------------------------------------------------------------------------- + /** The serializer configuration snapshot, cached for efficiency. */ + private transient AvroSchemaSerializerConfigSnapshot configSnapshot; + // ------------------------------------------------------------------------ + + /** + * Creates a new AvroSerializer for the type indicated by the given class. + */ public AvroSerializer(Class<T> type) { - this(type, type); + this.type = checkNotNull(type); } + /** + * @deprecated Use {@link AvroSerializer#AvroSerializer(Class)} instead. + */ + @Deprecated + @SuppressWarnings("unused") public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) { - this.type = checkNotNull(type); - this.typeToInstantiate = checkNotNull(typeToInstantiate); + this(type); + } - InstantiationUtil.checkForInstantiation(typeToInstantiate); + // ------------------------------------------------------------------------ - this.kryoRegistrations = buildKryoRegistrations(type); + public Class<T> getType() { + return type; } - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ @Override public boolean isImmutableType() { @@ -104,32 +112,17 @@ public final class AvroSerializer<T> extends TypeSerializer<T> { } @Override - public AvroSerializer<T> duplicate() { - return new AvroSerializer<T>(type, typeToInstantiate); - } - - @Override - public T createInstance() { - return InstantiationUtil.instantiate(this.typeToInstantiate); - } - - @Override - public T copy(T from) { - checkKryoInitialized(); - - return KryoUtils.copy(from, kryo, this); + public int getLength() { + return -1; } - @Override - public T copy(T from, T reuse) { - checkKryoInitialized(); - - return KryoUtils.copy(from, reuse, kryo, this); - } + // ------------------------------------------------------------------------ + // Serialization + // ------------------------------------------------------------------------ @Override - public int getLength() { - return -1; + public T createInstance() { + return InstantiationUtil.instantiate(type); } @Override @@ -153,111 +146,216 @@ public final class AvroSerializer<T> extends TypeSerializer<T> { return this.reader.read(reuse, this.decoder); } + // ------------------------------------------------------------------------ + // Copying + // ------------------------------------------------------------------------ + @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { + public T copy(T from) { checkAvroInitialized(); + return avroData.deepCopy(schema, from); + } - if (this.deepCopyInstance == null) { - this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); - } - - this.decoder.setIn(source); - this.encoder.setOut(target); + @Override + public T copy(T from, T reuse) { + return copy(from); + } - T tmp = this.reader.read(this.deepCopyInstance, this.decoder); - this.writer.write(tmp, this.encoder); + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + T value = deserialize(source); + serialize(value, target); } - private void checkAvroInitialized() { - if (this.reader == null) { - this.reader = new ReflectDatumReader<T>(type); - this.writer = new ReflectDatumWriter<T>(type); - this.encoder = new DataOutputEncoder(); - this.decoder = new DataInputDecoder(); + // ------------------------------------------------------------------------ + // Compatibility and Upgrades + // ------------------------------------------------------------------------ + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + if (configSnapshot == null) { + checkAvroInitialized(); + configSnapshot = new AvroSchemaSerializerConfigSnapshot(schema.toString(false)); } + return configSnapshot; } - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); + @Override + @SuppressWarnings("deprecation") + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot) { + // proper schema snapshot, can do the sophisticated schema-based compatibility check + final String schemaString = ((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString(); + final Schema lastSchema = new Schema.Parser().parse(schemaString); - kryo.setAsmEnabled(true); + checkAvroInitialized(); + final SchemaPairCompatibility compatibility = + SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema); - KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); + return compatibility.getType() == SchemaCompatibilityType.COMPATIBLE ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { + // old snapshot case, just compare the type + // we don't need to restore any Kryo stuff, since Kryo was never used for persistence, + // only for object-to-object copies. + final AvroSerializerConfigSnapshot old = (AvroSerializerConfigSnapshot) configSnapshot; + return type.equals(old.getTypeClass()) ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else { + return CompatibilityResult.requiresMigration(); } } - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @Override + public TypeSerializer<T> duplicate() { + return new AvroSerializer<>(type); + } @Override public int hashCode() { - return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode(); + return 42 + type.hashCode(); } @Override public boolean equals(Object obj) { - if (obj instanceof AvroSerializer) { - @SuppressWarnings("unchecked") - AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj; - - return avroSerializer.canEqual(this) && - type == avroSerializer.type && - typeToInstantiate == avroSerializer.typeToInstantiate; - } else { + if (obj == this) { + return true; + } + else if (obj != null && obj.getClass() == AvroSerializer.class) { + final AvroSerializer that = (AvroSerializer) obj; + return this.type == that.type; + } + else { return false; } } @Override public boolean canEqual(Object obj) { - return obj instanceof AvroSerializer; + return obj.getClass() == this.getClass(); } - // -------------------------------------------------------------------------------------------- - // Serializer configuration snapshotting & compatibility - // -------------------------------------------------------------------------------------------- - @Override - public AvroSerializerConfigSnapshot<T> snapshotConfiguration() { - return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations); + public String toString() { + return getClass().getName() + " (" + getType().getName() + ')'; } - @SuppressWarnings("unchecked") - @Override - public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof AvroSerializerConfigSnapshot) { - final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot; + // ------------------------------------------------------------------------ + // Initialization + // ------------------------------------------------------------------------ + + private void checkAvroInitialized() { + if (writer == null) { + initializeAvro(); + } + } + + private void initializeAvro() { + final ClassLoader cl = Thread.currentThread().getContextClassLoader(); + + if (SpecificRecord.class.isAssignableFrom(type)) { + this.avroData = new SpecificData(cl); + this.schema = this.avroData.getSchema(type); + this.reader = new SpecificDatumReader<>(schema, schema, avroData); + this.writer = new SpecificDatumWriter<>(schema, avroData); + } + else { + final ReflectData reflectData = new ReflectData(cl); + this.avroData = reflectData; + this.schema = this.avroData.getSchema(type); + this.reader = new ReflectDatumReader<>(schema, schema, reflectData); + this.writer = new ReflectDatumWriter<>(schema, reflectData); + } + + this.encoder = new DataOutputEncoder(); + this.decoder = new DataInputDecoder(); + } + + // ------------------------------------------------------------------------ + // Serializer Snapshots + // ------------------------------------------------------------------------ + + /** + * A config snapshot for the Avro Serializer that stores the Avro Schema to check compatibility. + */ + public static final class AvroSchemaSerializerConfigSnapshot extends TypeSerializerConfigSnapshot { + + private String schemaString; + + /** + * Default constructor for instantiation via reflection. + */ + @SuppressWarnings("unused") + public AvroSchemaSerializerConfigSnapshot() {} + + public AvroSchemaSerializerConfigSnapshot(String schemaString) { + this.schemaString = checkNotNull(schemaString); + } + + public String getSchemaString() { + return schemaString; + } + + // --- Serialization --- + + @Override + public void read(DataInputView in) throws IOException { + super.read(in); + this.schemaString = in.readUTF(); + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + out.writeUTF(schemaString); + } - if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) { - // resolve Kryo registrations; currently, since the Kryo registrations in Avro - // are fixed, there shouldn't be a problem with the resolution here. + // --- Version --- - LinkedHashMap<String, KryoRegistration> oldRegistrations = config.getKryoRegistrations(); - oldRegistrations.putAll(kryoRegistrations); + @Override + public int getVersion() { + return 1; + } - for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) { - if (reconfiguredRegistrationEntry.getValue().isDummy()) { - return CompatibilityResult.requiresMigration(); - } - } + // --- Utils --- - this.kryoRegistrations = oldRegistrations; - return CompatibilityResult.compatible(); + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + else if (obj != null && obj.getClass() == AvroSchemaSerializerConfigSnapshot.class) { + final AvroSchemaSerializerConfigSnapshot that = (AvroSchemaSerializerConfigSnapshot) obj; + return this.schemaString.equals(that.schemaString); + } + else { + return false; } } - // ends up here if the preceding serializer is not - // the ValueSerializer, or serialized data type has changed - return CompatibilityResult.requiresMigration(); + @Override + public int hashCode() { + return 11 + schemaString.hashCode(); + } + + @Override + public String toString() { + return getClass().getName() + " (" + schemaString + ')'; + } } /** - * {@link TypeSerializerConfigSnapshot} for Avro. + * The outdated config snapshot, retained for backwards compatibility. + * + * @deprecated The {@link AvroSchemaSerializerConfigSnapshot} should be used instead. */ + @Deprecated public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> { private static final int VERSION = 1; @@ -266,15 +364,6 @@ public final class AvroSerializer<T> extends TypeSerializer<T> { public AvroSerializerConfigSnapshot() {} - public AvroSerializerConfigSnapshot( - Class<T> baseType, - Class<? extends T> typeToInstantiate, - LinkedHashMap<String, KryoRegistration> kryoRegistrations) { - - super(baseType, kryoRegistrations); - this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate); - } - @Override public void write(DataOutputView out) throws IOException { super.write(out); @@ -304,35 +393,4 @@ public final class AvroSerializer<T> extends TypeSerializer<T> { return typeToInstantiate; } } - - // -------------------------------------------------------------------------------------------- - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - // kryoRegistrations may be null if this Avro serializer is deserialized from an old version - if (kryoRegistrations == null) { - this.kryoRegistrations = buildKryoRegistrations(type); - } - } - - private static <T> LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(Class<T> serializedDataType) { - final LinkedHashMap<String, KryoRegistration> registrations = new LinkedHashMap<>(); - - // register Avro types. - registrations.put( - GenericData.Array.class.getName(), - new KryoRegistration( - GenericData.Array.class, - new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList()))); - registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class)); - registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class)); - registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class)); - registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class)); - - // register the serialized data type - registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType)); - - return registrations; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java index ad6b06e..644ee50 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java @@ -33,6 +33,7 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; /** * Special type information to generate a special AvroTypeInfo for Avro POJOs (implementing SpecificRecordBase, the typed Avro POJOs) @@ -49,43 +50,83 @@ public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> private static final long serialVersionUID = 1L; + private static final ConcurrentHashMap<Thread, Boolean> IN_BACKWARDS_COMPATIBLE_MODE = new ConcurrentHashMap<>(); + + private final boolean useBackwardsCompatibleSerializer; + + /** + * Creates a new Avro type info for the given class. + */ public AvroTypeInfo(Class<T> typeClass) { - super(typeClass, generateFieldsFromAvroSchema(typeClass)); + this(typeClass, false); + } + + /** + * Creates a new Avro type info for the given class. + * + * <p>This constructor takes a flag to specify whether a serializer + * that is backwards compatible with PoJo-style serialization of Avro types should be used. + * That is only necessary, if one has a Flink 1.3 (or earlier) savepoint where Avro types + * were stored in the checkpointed state. New Flink programs will never need this. + */ + public AvroTypeInfo(Class<T> typeClass, boolean useBackwardsCompatibleSerializer) { + super(typeClass, generateFieldsFromAvroSchema(typeClass, useBackwardsCompatibleSerializer)); + + final Boolean modeOnStack = IN_BACKWARDS_COMPATIBLE_MODE.get(Thread.currentThread()); + this.useBackwardsCompatibleSerializer = modeOnStack == null ? + useBackwardsCompatibleSerializer : modeOnStack; } @Override + @SuppressWarnings("deprecation") public TypeSerializer<T> createSerializer(ExecutionConfig config) { - return super.createSerializer(config); + return useBackwardsCompatibleSerializer ? + new BackwardsCompatibleAvroSerializer<>(getTypeClass()) : + new AvroSerializer<>(getTypeClass()); } @SuppressWarnings("unchecked") @Internal - public static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) { - PojoTypeExtractor pte = new PojoTypeExtractor(); - ArrayList<Type> typeHierarchy = new ArrayList<>(); - typeHierarchy.add(typeClass); - TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null); - - if (!(ti instanceof PojoTypeInfo)) { - throw new IllegalStateException("Expecting type to be a PojoTypeInfo"); - } - PojoTypeInfo pti = (PojoTypeInfo) ti; - List<PojoField> newFields = new ArrayList<>(pti.getTotalFields()); - - for (int i = 0; i < pti.getArity(); i++) { - PojoField f = pti.getPojoFieldAt(i); - TypeInformation newType = f.getTypeInformation(); - // check if type is a CharSequence - if (newType instanceof GenericTypeInfo) { - if ((newType).getTypeClass().equals(CharSequence.class)) { - // replace the type by a org.apache.avro.util.Utf8 - newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class); + public static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema( + Class<T> typeClass, + boolean useBackwardsCompatibleSerializer) { + + final Thread currentThread = Thread.currentThread(); + final boolean entryPoint = + IN_BACKWARDS_COMPATIBLE_MODE.putIfAbsent(currentThread, useBackwardsCompatibleSerializer) == null; + + try { + PojoTypeExtractor pte = new PojoTypeExtractor(); + ArrayList<Type> typeHierarchy = new ArrayList<>(); + typeHierarchy.add(typeClass); + TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null); + + if (!(ti instanceof PojoTypeInfo)) { + throw new IllegalStateException("Expecting type to be a PojoTypeInfo"); + } + PojoTypeInfo pti = (PojoTypeInfo) ti; + List<PojoField> newFields = new ArrayList<>(pti.getTotalFields()); + + for (int i = 0; i < pti.getArity(); i++) { + PojoField f = pti.getPojoFieldAt(i); + TypeInformation newType = f.getTypeInformation(); + // check if type is a CharSequence + if (newType instanceof GenericTypeInfo) { + if ((newType).getTypeClass().equals(CharSequence.class)) { + // replace the type by a org.apache.avro.util.Utf8 + newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class); + } } + PojoField newField = new PojoField(f.getField(), newType); + newFields.add(newField); + } + return newFields; + } + finally { + if (entryPoint) { + IN_BACKWARDS_COMPATIBLE_MODE.remove(currentThread); } - PojoField newField = new PojoField(f.getField(), newType); - newFields.add(newField); } - return newFields; } private static class PojoTypeExtractor extends TypeExtractor { http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java new file mode 100644 index 0000000..e5eb5d8 --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot; + +import org.apache.avro.specific.SpecificRecordBase; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An Avro serializer that can switch back to a KryoSerializer or a Pojo Serializer, if + * it has to ensure compatibility with one of those. + * + * <p>This serializer is there only as a means to explicitly fall back to PoJo serialization + * in the case where an upgrade from an earlier savepoint was made. + * + * @param <T> The type to be serialized. + */ +@SuppressWarnings("deprecation") +public class BackwardsCompatibleAvroSerializer<T> extends TypeSerializer<T> { + + private static final long serialVersionUID = 1L; + + /** The type to serialize. */ + private final Class<T> type; + + /** The type serializer currently used. Avro by default. */ + private TypeSerializer<T> serializer; + + /** + * Creates a new backwards-compatible Avro Serializer, for the given type. + */ + public BackwardsCompatibleAvroSerializer(Class<T> type) { + this.type = type; + this.serializer = new AvroSerializer<>(type); + } + + /** + * Private copy constructor. + */ + private BackwardsCompatibleAvroSerializer(Class<T> type, TypeSerializer<T> serializer) { + this.type = type; + this.serializer = serializer; + } + + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ + + @Override + public boolean isImmutableType() { + return serializer.isImmutableType(); + } + + @Override + public int getLength() { + return serializer.getLength(); + } + + // ------------------------------------------------------------------------ + // Serialization + // ------------------------------------------------------------------------ + + @Override + public T createInstance() { + return serializer.createInstance(); + } + + @Override + public void serialize(T value, DataOutputView target) throws IOException { + serializer.serialize(value, target); + } + + @Override + public T deserialize(DataInputView source) throws IOException { + return serializer.deserialize(source); + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + return serializer.deserialize(reuse, source); + } + + // ------------------------------------------------------------------------ + // Copying + // ------------------------------------------------------------------------ + + @Override + public T copy(T from) { + return serializer.copy(from); + } + + @Override + public T copy(T from, T reuse) { + return serializer.copy(from, reuse); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + serializer.copy(source, target); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @Override + public TypeSerializer<T> duplicate() { + return new BackwardsCompatibleAvroSerializer<>(type, serializer.duplicate()); + } + + @Override + public int hashCode() { + return type.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + else if (obj != null && obj.getClass() == BackwardsCompatibleAvroSerializer.class) { + final BackwardsCompatibleAvroSerializer that = (BackwardsCompatibleAvroSerializer) obj; + return this.type == that.type && this.serializer.equals(that.serializer); + } + else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj.getClass() == this.getClass(); + } + + @Override + public String toString() { + return getClass().getName() + " (" + type.getName() + ')'; + } + + // ------------------------------------------------------------------------ + // Configuration Snapshots and Upgrades + // ------------------------------------------------------------------------ + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + // we return the configuration of the actually used serializer here + return serializer.snapshotConfiguration(); + } + + @Override + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot || + configSnapshot instanceof AvroSerializerConfigSnapshot) { + + // avro serializer, nice :-) + checkState(serializer instanceof AvroSerializer, + "Serializer was changed backwards to PojoSerializer and now encounters AvroSerializer snapshot."); + + return serializer.ensureCompatibility(configSnapshot); + } + else if (configSnapshot instanceof PojoSerializerConfigSnapshot) { + // common previous case + checkState(SpecificRecordBase.class.isAssignableFrom(type), + "BackwardsCompatibleAvroSerializer resuming a state serialized " + + "via a PojoSerializer, but not for an Avro Specific Record"); + + final AvroTypeInfo<? extends SpecificRecordBase> typeInfo = + new AvroTypeInfo<>(type.asSubclass(SpecificRecordBase.class), true); + + @SuppressWarnings("unchecked") + final TypeSerializer<T> pojoSerializer = + (TypeSerializer<T>) typeInfo.createPojoSerializer(new ExecutionConfig()); + this.serializer = pojoSerializer; + return serializer.ensureCompatibility(configSnapshot); + } + else if (configSnapshot instanceof KryoRegistrationSerializerConfigSnapshot) { + // force-kryo old case common previous case + // we create a new Kryo Serializer with a blank execution config. + // registrations are anyways picked up from the snapshot. + serializer = new KryoSerializer<>(type, new ExecutionConfig()); + return serializer.ensureCompatibility(configSnapshot); + } + else { + // completely incompatible type, needs migration + return CompatibilityResult.requiresMigration(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java new file mode 100644 index 0000000..0ab5868 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.formats.avro.utils.TestDataGenerator; + +import java.util.Random; + +/** + * Tests for the {@link AvroSerializer} that test specific avro types. + */ +public class AvroSerializerTest extends SerializerTestBase<User> { + + @Override + protected TypeSerializer<User> createSerializer() { + return new AvroSerializer<>(User.class); + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<User> getTypeClass() { + return User.class; + } + + @Override + protected User[] getTestData() { + final Random rnd = new Random(); + final User[] users = new User[20]; + + for (int i = 0; i < users.length; i++) { + users[i] = TestDataGenerator.generateRandomUser(rnd); + } + + return users; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java index ae41031..fbabb95 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java @@ -82,21 +82,14 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase { AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); DataSet<User> usersDS = env.createInput(users) - // null map type because the order changes in different JVMs (hard to test) - .map(new MapFunction<User, User>() { - @Override - public User map(User value) throws Exception { - value.setTypeMap(null); - return value; - } - }); + .map((value) -> value); usersDS.writeAsText(resultPath); env.execute("Simple Avro read job"); - expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" + - "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n"; + expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" + + "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n"; } @Test @@ -107,7 +100,6 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase { AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); DataSet<User> usersDS = env.createInput(users) - // null map type because the order changes in different JVMs (hard to test) .map(new MapFunction<User, User>() { @Override public User map(User value) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java index 79a4a45..371cd4f 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java @@ -18,10 +18,16 @@ package org.apache.flink.formats.avro.typeutils; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeInformationTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.formats.avro.generated.Address; import org.apache.flink.formats.avro.generated.User; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + /** * Test for {@link AvroTypeInfo}. */ @@ -34,4 +40,10 @@ public class AvroTypeInfoTest extends TypeInformationTestBase<AvroTypeInfo<?>> { new AvroTypeInfo<>(User.class), }; } + + @Test + public void testAvroByDefault() { + final TypeSerializer<User> serializer = new AvroTypeInfo<>(User.class).createSerializer(new ExecutionConfig()); + assertTrue(serializer instanceof AvroSerializer); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java new file mode 100644 index 0000000..92395ba --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.formats.avro.utils.TestDataGenerator; + +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * This test ensures that state and state configuration created by Flink 1.3 Avro types + * that used the PojoSerializer still works. + * + * <p><b>Important:</b> Since Avro itself broke class compatibility between 1.7.7 (used in Flink 1.3) + * and 1.8.2 (used in Flink 1.4), the Avro by Pojo compatibility is broken through Avro already. + * This test only tests that the Avro serializer change (switching from Pojo to Avro for Avro types) + * works properly. + * + * <p>This test can be dropped once we drop backwards compatibility with Flink 1.3 snapshots. + */ +public class BackwardsCompatibleAvroSerializerTest { + + private static final String SNAPSHOT_RESOURCE = "flink-1.3-avro-type-serializer-snapshot"; + + private static final String DATA_RESOURCE = "flink-1.3-avro-type-serialized-data"; + + @SuppressWarnings("unused") + private static final String SNAPSHOT_RESOURCE_WRITER = "/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" + SNAPSHOT_RESOURCE; + + @SuppressWarnings("unused") + private static final String DATA_RESOURCE_WRITER = "/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" + DATA_RESOURCE; + + private static final long RANDOM_SEED = 143065108437678L; + + private static final int NUM_DATA_ENTRIES = 20; + + @Test + public void testCompatibilityWithFlink_1_3() throws Exception { + + // retrieve the old config snapshot + + final TypeSerializer<User> serializer; + final TypeSerializerConfigSnapshot configSnapshot; + + try (InputStream in = getClass().getClassLoader().getResourceAsStream(SNAPSHOT_RESOURCE)) { + DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(in); + + List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> deserialized = + TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience( + inView, getClass().getClassLoader()); + + assertEquals(1, deserialized.size()); + + @SuppressWarnings("unchecked") + final TypeSerializer<User> typedSerializer = (TypeSerializer<User>) deserialized.get(0).f0; + + serializer = typedSerializer; + configSnapshot = deserialized.get(0).f1; + } + + assertNotNull(serializer); + assertNotNull(configSnapshot); + + assertTrue(serializer instanceof PojoSerializer); + assertTrue(configSnapshot instanceof PojoSerializerConfigSnapshot); + + // sanity check for the test: check that the test data works with the original serializer + validateDeserialization(serializer); + + // sanity check for the test: check that a PoJoSerializer and the original serializer work together + assertFalse(serializer.ensureCompatibility(configSnapshot).isRequiresMigration()); + + final TypeSerializer<User> newSerializer = new AvroTypeInfo<>(User.class, true).createSerializer(new ExecutionConfig()); + assertFalse(newSerializer.ensureCompatibility(configSnapshot).isRequiresMigration()); + + // deserialize the data and make sure this still works + validateDeserialization(newSerializer); + + TypeSerializerConfigSnapshot nextSnapshot = newSerializer.snapshotConfiguration(); + final TypeSerializer<User> nextSerializer = new AvroTypeInfo<>(User.class, true).createSerializer(new ExecutionConfig()); + + assertFalse(nextSerializer.ensureCompatibility(nextSnapshot).isRequiresMigration()); + + // deserialize the data and make sure this still works + validateDeserialization(nextSerializer); + } + + private static void validateDeserialization(TypeSerializer<User> serializer) throws IOException { + final Random rnd = new Random(RANDOM_SEED); + + try (InputStream in = BackwardsCompatibleAvroSerializerTest.class.getClassLoader() + .getResourceAsStream(DATA_RESOURCE)) { + + final DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(in); + + for (int i = 0; i < NUM_DATA_ENTRIES; i++) { + final User deserialized = serializer.deserialize(inView); + + // deterministically generate a reference record + final User reference = TestDataGenerator.generateRandomUser(rnd); + + assertEquals(reference, deserialized); + } + } + } + +// run this code on a 1.3 (or earlier) branch to generate the test data +// public static void main(String[] args) throws Exception { +// +// AvroTypeInfo<User> typeInfo = new AvroTypeInfo<>(User.class); +// +// TypeSerializer<User> serializer = typeInfo.createPojoSerializer(new ExecutionConfig()); +// TypeSerializerConfigSnapshot confSnapshot = serializer.snapshotConfiguration(); +// +// try (FileOutputStream fos = new FileOutputStream(SNAPSHOT_RESOURCE_WRITER)) { +// DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(fos); +// +// TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( +// out, +// Collections.singletonList( +// new Tuple2<>(serializer, confSnapshot))); +// } +// +// try (FileOutputStream fos = new FileOutputStream(DATA_RESOURCE_WRITER)) { +// final DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(fos); +// final Random rnd = new Random(RANDOM_SEED); +// +// for (int i = 0; i < NUM_DATA_ENTRIES; i++) { +// serializer.serialize(TestDataGenerator.generateRandomUser(rnd), out); +// } +// } +// } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java new file mode 100644 index 0000000..9a9061e --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.utils; + +import org.apache.flink.formats.avro.generated.Address; +import org.apache.flink.formats.avro.generated.Colors; +import org.apache.flink.formats.avro.generated.Fixed16; +import org.apache.flink.formats.avro.generated.User; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Random; + +/** + * Generator for random test data for the generated Avro User type. + */ +public class TestDataGenerator { + + public static User generateRandomUser(Random rnd) { + return new User( + generateRandomString(rnd, 50), + rnd.nextBoolean() ? null : rnd.nextInt(), + rnd.nextBoolean() ? null : generateRandomString(rnd, 6), + rnd.nextBoolean() ? null : rnd.nextLong(), + rnd.nextDouble(), + null, + rnd.nextBoolean(), + generateRandomStringList(rnd, 20, 30), + generateRandomBooleanList(rnd, 20), + rnd.nextBoolean() ? null : generateRandomStringList(rnd, 20, 20), + generateRandomColor(rnd), + new HashMap<>(), + generateRandomFixed16(rnd), + generateRandomUnion(rnd), + generateRandomAddress(rnd)); + } + + public static Colors generateRandomColor(Random rnd) { + return Colors.values()[rnd.nextInt(Colors.values().length)]; + } + + public static Fixed16 generateRandomFixed16(Random rnd) { + if (rnd.nextBoolean()) { + return new Fixed16(); + } + else { + byte[] bytes = new byte[16]; + rnd.nextBytes(bytes); + return new Fixed16(bytes); + } + } + + public static Address generateRandomAddress(Random rnd) { + return new Address( + rnd.nextInt(), + generateRandomString(rnd, 20), + generateRandomString(rnd, 20), + generateRandomString(rnd, 20), + generateRandomString(rnd, 20)); + } + + private static List<Boolean> generateRandomBooleanList(Random rnd, int maxEntries) { + final int num = rnd.nextInt(maxEntries + 1); + ArrayList<Boolean> list = new ArrayList<>(); + for (int i = 0; i < num; i++) { + list.add(rnd.nextBoolean()); + } + return list; + } + + private static List<CharSequence> generateRandomStringList(Random rnd, int maxEntries, int maxLen) { + final int num = rnd.nextInt(maxEntries + 1); + ArrayList<CharSequence> list = new ArrayList<>(); + for (int i = 0; i < num; i++) { + list.add(generateRandomString(rnd, maxLen)); + } + return list; + } + + private static String generateRandomString(Random rnd, int maxLen) { + char[] chars = new char[rnd.nextInt(maxLen + 1)]; + for (int i = 0; i < chars.length; i++) { + chars[i] = (char) rnd.nextInt(Character.MAX_VALUE); + } + return new String(chars); + } + + private static Object generateRandomUnion(Random rnd) { + if (rnd.nextBoolean()) { + if (rnd.nextBoolean()) { + return null; + } else { + return rnd.nextBoolean(); + } + } else { + if (rnd.nextBoolean()) { + return rnd.nextLong(); + } else { + return rnd.nextDouble(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data new file mode 100644 index 0000000..028c1e6 Binary files /dev/null and b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data differ http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot new file mode 100644 index 0000000..5bfdf728 Binary files /dev/null and b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot differ
