[
https://issues.apache.org/jira/browse/FLINK-10605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16668745#comment-16668745
]
ASF GitHub Bot commented on FLINK-10605:
----------------------------------------
asfgit closed pull request #6881: [FLINK-10605] [core] Upgrade AvroSerializer
snapshot to implement new TypeSerializerSnapshot interface
URL: https://github.com/apache/flink/pull/6881
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
deleted file mode 100644
index 03bacfab134..00000000000
---
a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.annotation.Public;
-
-import org.apache.avro.specific.SpecificRecordBase;
-
-import static
org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema;
-
-/**
- * @deprecated Please use
<code>org.apache.flink.formats.avro.typeutils.AvroTypeInfo</code>
- * in the <code>flink-avro</code> module. This class will be removed in the
near future.
- */
-@Deprecated
-@Public
-public class AvroTypeInfo<T extends SpecificRecordBase> extends
PojoTypeInfo<T> {
-
- public AvroTypeInfo(Class<T> typeClass) {
- super(typeClass, generateFieldsFromAvroSchema(typeClass, true));
- }
-}
diff --git
a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
deleted file mode 100644
index 5f76b094361..00000000000
---
a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.formats.avro.utils.DataInputDecoder;
-import org.apache.flink.formats.avro.utils.DataOutputEncoder;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Old deprecated Avro serializer. It is retained for a smoother experience
when
- * upgrading from an earlier Flink savepoint that stored this serializer.
- */
-@Internal
-@Deprecated
-@SuppressWarnings({"unused", "deprecation"})
-public final class AvroSerializer<T> extends TypeSerializer<T> {
-
- private static final long serialVersionUID = 1L;
-
- private final Class<T> type;
-
- private final Class<? extends T> typeToInstantiate;
-
- /**
- * Map of class tag (using classname as tag) to their Kryo registration.
- *
- * <p>This map serves as a preview of the final registration result of
- * the Kryo instance, taking into account registration overwrites.
- */
- private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
-
- private transient ReflectDatumWriter<T> writer;
- private transient ReflectDatumReader<T> reader;
-
- private transient DataOutputEncoder encoder;
- private transient DataInputDecoder decoder;
-
- private transient Kryo kryo;
-
- private transient T deepCopyInstance;
-
- //
--------------------------------------------------------------------------------------------
-
- public AvroSerializer(Class<T> type) {
- this(type, type);
- }
-
- public AvroSerializer(Class<T> type, Class<? extends T>
typeToInstantiate) {
- this.type = checkNotNull(type);
- this.typeToInstantiate = checkNotNull(typeToInstantiate);
-
- InstantiationUtil.checkForInstantiation(typeToInstantiate);
-
- this.kryoRegistrations = buildKryoRegistrations(type);
- }
-
- //
--------------------------------------------------------------------------------------------
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public AvroSerializer<T> duplicate() {
- return new AvroSerializer<>(type, typeToInstantiate);
- }
-
- @Override
- public T createInstance() {
- return InstantiationUtil.instantiate(this.typeToInstantiate);
- }
-
- @Override
- public T copy(T from) {
- checkKryoInitialized();
-
- return KryoUtils.copy(from, kryo, this);
- }
-
- @Override
- public T copy(T from, T reuse) {
- checkKryoInitialized();
-
- return KryoUtils.copy(from, reuse, kryo, this);
- }
-
- @Override
- public int getLength() {
- return -1;
- }
-
- @Override
- public void serialize(T value, DataOutputView target) throws
IOException {
- checkAvroInitialized();
- this.encoder.setOut(target);
- this.writer.write(value, this.encoder);
- }
-
- @Override
- public T deserialize(DataInputView source) throws IOException {
- checkAvroInitialized();
- this.decoder.setIn(source);
- return this.reader.read(null, this.decoder);
- }
-
- @Override
- public T deserialize(T reuse, DataInputView source) throws IOException {
- checkAvroInitialized();
- this.decoder.setIn(source);
- return this.reader.read(reuse, this.decoder);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws
IOException {
- checkAvroInitialized();
-
- if (this.deepCopyInstance == null) {
- this.deepCopyInstance =
InstantiationUtil.instantiate(type, Object.class);
- }
-
- this.decoder.setIn(source);
- this.encoder.setOut(target);
-
- T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
- this.writer.write(tmp, this.encoder);
- }
-
- private void checkAvroInitialized() {
- if (this.reader == null) {
- this.reader = new ReflectDatumReader<>(type);
- this.writer = new ReflectDatumWriter<>(type);
- this.encoder = new DataOutputEncoder();
- this.decoder = new DataInputDecoder();
- }
- }
-
- private void checkKryoInitialized() {
- if (this.kryo == null) {
- this.kryo = new Kryo();
-
- Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
new Kryo.DefaultInstantiatorStrategy();
-
instantiatorStrategy.setFallbackInstantiatorStrategy(new
StdInstantiatorStrategy());
- kryo.setInstantiatorStrategy(instantiatorStrategy);
-
- kryo.setAsmEnabled(true);
-
- KryoUtils.applyRegistrations(kryo,
kryoRegistrations.values());
- }
- }
-
- //
--------------------------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return 31 * this.type.hashCode() +
this.typeToInstantiate.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof AvroSerializer) {
- @SuppressWarnings("unchecked")
- AvroSerializer<T> avroSerializer = (AvroSerializer<T>)
obj;
-
- return avroSerializer.canEqual(this) &&
- type == avroSerializer.type &&
- typeToInstantiate ==
avroSerializer.typeToInstantiate;
- } else {
- return false;
- }
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof AvroSerializer;
- }
-
- //
--------------------------------------------------------------------------------------------
- // Serializer configuration snapshotting & compatibility
- //
--------------------------------------------------------------------------------------------
-
- @Override
- public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
- return new AvroSerializerConfigSnapshot<>(type,
typeToInstantiate, kryoRegistrations);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public CompatibilityResult<T>
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
- if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
- final AvroSerializerConfigSnapshot<T> config =
(AvroSerializerConfigSnapshot<T>) configSnapshot;
-
- if (type.equals(config.getTypeClass()) &&
typeToInstantiate.equals(config.getTypeToInstantiate())) {
- // resolve Kryo registrations; currently, since
the Kryo registrations in Avro
- // are fixed, there shouldn't be a problem with
the resolution here.
-
- LinkedHashMap<String, KryoRegistration>
oldRegistrations = config.getKryoRegistrations();
- oldRegistrations.putAll(kryoRegistrations);
-
- for (Map.Entry<String, KryoRegistration>
reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
- if
(reconfiguredRegistrationEntry.getValue().isDummy()) {
- return
CompatibilityResult.requiresMigration();
- }
- }
-
- this.kryoRegistrations = oldRegistrations;
- return CompatibilityResult.compatible();
- }
- }
-
- // ends up here if the preceding serializer is not
- // the ValueSerializer, or serialized data type has changed
- return CompatibilityResult.requiresMigration();
- }
-
- /**
- * Config snapshot for this serializer.
- */
- public static class AvroSerializerConfigSnapshot<T> extends
KryoRegistrationSerializerConfigSnapshot<T> {
-
- private static final int VERSION = 1;
-
- private Class<? extends T> typeToInstantiate;
-
- public AvroSerializerConfigSnapshot() {}
-
- public AvroSerializerConfigSnapshot(
- Class<T> baseType,
- Class<? extends T> typeToInstantiate,
- LinkedHashMap<String, KryoRegistration>
kryoRegistrations) {
-
- super(baseType, kryoRegistrations);
- this.typeToInstantiate =
Preconditions.checkNotNull(typeToInstantiate);
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- super.write(out);
-
- out.writeUTF(typeToInstantiate.getName());
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void read(DataInputView in) throws IOException {
- super.read(in);
-
- String classname = in.readUTF();
- try {
- typeToInstantiate = (Class<? extends T>)
Class.forName(classname, true, getUserCodeClassLoader());
- } catch (ClassNotFoundException e) {
- throw new IOException("Cannot find requested
class " + classname + " in classpath.", e);
- }
- }
-
- @Override
- public int getVersion() {
- return VERSION;
- }
-
- public Class<? extends T> getTypeToInstantiate() {
- return typeToInstantiate;
- }
- }
-
- //
--------------------------------------------------------------------------------------------
-
- private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
- in.defaultReadObject();
-
- // kryoRegistrations may be null if this Avro serializer is
deserialized from an old version
- if (kryoRegistrations == null) {
- this.kryoRegistrations = buildKryoRegistrations(type);
- }
- }
-
- private static <T> LinkedHashMap<String, KryoRegistration>
buildKryoRegistrations(Class<T> serializedDataType) {
- final LinkedHashMap<String, KryoRegistration> registrations =
new LinkedHashMap<>();
-
- // register Avro types.
- registrations.put(
- GenericData.Array.class.getName(),
- new KryoRegistration(
- GenericData.Array.class,
- new
ExecutionConfig.SerializableSerializer<>(new
Serializers.SpecificInstanceCollectionSerializerForArrayList())));
- registrations.put(Utf8.class.getName(), new
KryoRegistration(Utf8.class));
- registrations.put(GenericData.EnumSymbol.class.getName(), new
KryoRegistration(GenericData.EnumSymbol.class));
- registrations.put(GenericData.Fixed.class.getName(), new
KryoRegistration(GenericData.Fixed.class));
- registrations.put(GenericData.StringType.class.getName(), new
KryoRegistration(GenericData.StringType.class));
-
- // register the serialized data type
- registrations.put(serializedDataType.getName(), new
KryoRegistration(serializedDataType));
-
- return registrations;
- }
-}
diff --git
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
index 2d06476d1c5..a9bdcee8bb5 100644
---
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
+++
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
@@ -164,7 +164,7 @@ public boolean isEndOfStream(T nextElement) {
@SuppressWarnings("unchecked")
public TypeInformation<T> getProducedType() {
if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
- return new AvroTypeInfo(recordClazz, false);
+ return new AvroTypeInfo(recordClazz);
} else {
return (TypeInformation<T>) new
GenericRecordAvroTypeInfo(this.reader);
}
diff --git
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
new file mode 100644
index 00000000000..0ca25bfaa04
--- /dev/null
+++
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.formats.avro.utils.DataInputDecoder;
+import org.apache.flink.formats.avro.utils.DataOutputEncoder;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.Nullable;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Creates Avro {@link DatumReader} and {@link DatumWriter}.
+ *
+ * @param <T> The type to be serialized.
+ */
+@Internal
+final class AvroFactory<T> {
+
+ private final DataOutputEncoder encoder = new DataOutputEncoder();
+ private final DataInputDecoder decoder = new DataInputDecoder();
+
+ private final GenericData avroData;
+ private final Schema schema;
+ private final DatumWriter<T> writer;
+ private final DatumReader<T> reader;
+
+ /**
+ * Creates Avro Writer and Reader for a specific type.
+ *
+ * <p>Given an input type, and possible the current schema, and a
previously known schema (also known as writer
+ * schema) create will deduce the best way to initalize a reader and
writer according to the following rules:
+ * <ul>
+ * <li>If type is an Avro generated class (an {@link SpecificRecord}
then the reader would use the
+ * previousSchema for reading (if present) otherwise it would use the
schema attached to the auto generated
+ * class.
+ * <li>If the type is a GenericRecord then the reader and the writer
would be created with the supplied
+ * (mandatory) schema.
+ * <li>Otherwise, we use Avro's reflection based reader and writer that
would deduce the schema via reflection.
+ * If the previous schema is also present (when restoring a serializer
for example) then the reader would be
+ * created with both schemas.
+ * </ul>
+ */
+ static <T> AvroFactory<T> create(Class<T> type, @Nullable Schema
currentSchema, @Nullable Schema previousSchema) {
+ final ClassLoader cl =
Thread.currentThread().getContextClassLoader();
+
+ if (SpecificRecord.class.isAssignableFrom(type)) {
+ return fromSpecific(type, cl,
Optional.ofNullable(previousSchema));
+ }
+ if (GenericRecord.class.isAssignableFrom(type)) {
+ return fromGeneric(cl, currentSchema);
+ }
+ return fromReflective(type, cl,
Optional.ofNullable(previousSchema));
+ }
+
+ static <T> AvroFactory<T> createFromTypeAndSchemaString(Class<T> type,
@Nullable String schemaString) {
+ Schema schema = (schemaString != null) ? new
Schema.Parser().parse(schemaString) : null;
+ return create(type, schema, null);
+ }
+
+ @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+ private static <T> AvroFactory<T> fromSpecific(Class<T> type,
ClassLoader cl, Optional<Schema> previousSchema) {
+ SpecificData specificData = new SpecificData(cl);
+ Schema newSchema = specificData.getSchema(type);
+
+ return new AvroFactory<>(
+ specificData,
+ newSchema,
+ new
SpecificDatumReader<>(previousSchema.orElse(newSchema), newSchema,
specificData),
+ new SpecificDatumWriter<>(newSchema, specificData)
+ );
+ }
+
+ private static <T> AvroFactory<T> fromGeneric(ClassLoader cl, Schema
schema) {
+ checkNotNull(schema,
+ "Unable to create an AvroSerializer with a
GenericRecord type without a schema");
+ GenericData genericData = new GenericData(cl);
+
+ return new AvroFactory<>(
+ genericData,
+ schema,
+ new GenericDatumReader<>(schema, schema, genericData),
+ new GenericDatumWriter<>(schema, genericData)
+ );
+ }
+
+ @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+ private static <T> AvroFactory<T> fromReflective(Class<T> type,
ClassLoader cl, Optional<Schema> previousSchema) {
+ ReflectData reflectData = new ReflectData(cl);
+ Schema newSchema = reflectData.getSchema(type);
+
+ return new AvroFactory<>(
+ reflectData,
+ newSchema,
+ new
ReflectDatumReader<>(previousSchema.orElse(newSchema), newSchema, reflectData),
+ new ReflectDatumWriter<>(newSchema, reflectData)
+ );
+ }
+
+ private AvroFactory(
+ GenericData avroData,
+ Schema schema,
+ DatumReader<T> reader,
+ DatumWriter<T> writer) {
+
+ this.avroData = checkNotNull(avroData);
+ this.schema = checkNotNull(schema);
+ this.writer = checkNotNull(writer);
+ this.reader = checkNotNull(reader);
+ }
+
+ DataOutputEncoder getEncoder() {
+ return encoder;
+ }
+
+ DataInputDecoder getDecoder() {
+ return decoder;
+ }
+
+ Schema getSchema() {
+ return schema;
+ }
+
+ DatumWriter<T> getWriter() {
+ return writer;
+ }
+
+ DatumReader<T> getReader() {
+ return reader;
+ }
+
+ GenericData getAvroData() {
+ return avroData;
+ }
+}
diff --git
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
index dad1d6df16a..37aa2d3bc44 100644
---
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
+++
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -18,10 +18,11 @@
package org.apache.flink.formats.avro.typeutils;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.formats.avro.utils.DataInputDecoder;
@@ -33,15 +34,10 @@
import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.Nullable;
import org.apache.avro.specific.SpecificRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,29 +74,34 @@
* Because this flag is static final, a value of 'false' allows the JIT
compiler to eliminate
* the guarded code sections. */
private static final boolean CONCURRENT_ACCESS_CHECK =
- LOG.isDebugEnabled() ||
AvroSerializerDebugInitHelper.setToDebug;
+ LOG.isDebugEnabled() ||
AvroSerializerDebugInitHelper.setToDebug;
// -------- configuration fields, serializable -----------
/** The class of the type that is serialized by this serializer. */
private final Class<T> type;
+ private final SerializableAvroSchema schema;
+ private final SerializableAvroSchema previousSchema;
- private final String schemaString;
+ /** This field was present in this class prior to 1.7, and held the
string representation of
+ * a {@link Schema} (only in the case of an Avro GenericRecord). Since,
{@code FsStateBackend} stores the serializer
+ * (via Java serialization) within the checkpoint to later restore the
state, we need to have this field here.
+ * see {@link #initializeAvro()}.
+ */
+ @Deprecated
+ private final String schemaString = null;
// -------- runtime fields, non-serializable, lazily initialized
-----------
- private transient GenericDatumWriter<T> writer;
- private transient GenericDatumReader<T> reader;
-
+ private transient GenericData avroData;
+ private transient DatumWriter<T> writer;
private transient DataOutputEncoder encoder;
private transient DataInputDecoder decoder;
-
- private transient GenericData avroData;
-
- private transient Schema schema;
+ private transient DatumReader<T> reader;
+ private transient Schema runtimeSchema;
/** The serializer configuration snapshot, cached for efficiency. */
- private transient AvroSchemaSerializerConfigSnapshot<T> configSnapshot;
+ private transient TypeSerializerSnapshot<T> configSnapshot;
/** The currently accessing thread, set and checked on debug level
only. */
private transient volatile Thread currentThread;
@@ -113,10 +114,9 @@
* For serializing {@link GenericData.Record} use {@link
AvroSerializer#AvroSerializer(Class, Schema)}
*/
public AvroSerializer(Class<T> type) {
+ this(checkNotNull(type), new SerializableAvroSchema(), new
SerializableAvroSchema());
checkArgument(!isGenericRecord(type),
"For GenericData.Record use constructor with explicit
schema.");
- this.type = checkNotNull(type);
- this.schemaString = null;
}
/**
@@ -126,11 +126,19 @@ public AvroSerializer(Class<T> type) {
* {@link AvroSerializer#AvroSerializer(Class)}
*/
public AvroSerializer(Class<T> type, Schema schema) {
+ this(checkNotNull(type), new
SerializableAvroSchema(checkNotNull(schema)), new SerializableAvroSchema());
checkArgument(isGenericRecord(type),
"For classes other than GenericData.Record use
constructor without explicit schema.");
+ }
+
+ /**
+ * Creates a new AvroSerializer for the type indicated by the given
class.
+ */
+ @Internal
+ AvroSerializer(Class<T> type, @Nullable SerializableAvroSchema
newSchema, @Nullable SerializableAvroSchema previousSchema) {
this.type = checkNotNull(type);
- this.schema = checkNotNull(schema);
- this.schemaString = schema.toString();
+ this.schema = newSchema;
+ this.previousSchema = previousSchema;
}
/**
@@ -167,7 +175,7 @@ public int getLength() {
//
------------------------------------------------------------------------
@Override
- public T createInstance() {
+ public T createInstance() {
return InstantiationUtil.instantiate(type);
}
@@ -237,7 +245,7 @@ public T copy(T from) {
try {
checkAvroInitialized();
- return avroData.deepCopy(schema, from);
+ return avroData.deepCopy(runtimeSchema, from);
}
finally {
if (CONCURRENT_ACCESS_CHECK) {
@@ -264,10 +272,10 @@ public void copy(DataInputView source, DataOutputView
target) throws IOException
//
------------------------------------------------------------------------
@Override
- public TypeSerializerConfigSnapshot<T> snapshotConfiguration() {
+ public TypeSerializerSnapshot<T> snapshotConfiguration() {
if (configSnapshot == null) {
checkAvroInitialized();
- configSnapshot = new
AvroSchemaSerializerConfigSnapshot<>(schema.toString(false));
+ configSnapshot = new
AvroSerializerSnapshot<>(runtimeSchema, type);
}
return configSnapshot;
}
@@ -282,18 +290,10 @@ public void copy(DataInputView source, DataOutputView
target) throws IOException
checkAvroInitialized();
final SchemaPairCompatibility compatibility =
-
SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
+
SchemaCompatibility.checkReaderWriterCompatibility(runtimeSchema, lastSchema);
return compatibility.getType() ==
SchemaCompatibilityType.COMPATIBLE ?
- CompatibilityResult.compatible() :
CompatibilityResult.requiresMigration();
- }
- else if (configSnapshot instanceof
AvroSerializerConfigSnapshot) {
- // old snapshot case, just compare the type
- // we don't need to restore any Kryo stuff, since Kryo
was never used for persistence,
- // only for object-to-object copies.
- final AvroSerializerConfigSnapshot<T> old =
(AvroSerializerConfigSnapshot<T>) configSnapshot;
- return type.equals(old.getTypeClass()) ?
- CompatibilityResult.compatible() :
CompatibilityResult.requiresMigration();
+ CompatibilityResult.compatible() :
CompatibilityResult.requiresMigration();
}
else {
return CompatibilityResult.requiresMigration();
@@ -304,19 +304,15 @@ else if (configSnapshot instanceof
AvroSerializerConfigSnapshot) {
// Utilities
//
------------------------------------------------------------------------
- private static boolean isGenericRecord(Class<?> type) {
+ static boolean isGenericRecord(Class<?> type) {
return !SpecificRecord.class.isAssignableFrom(type) &&
GenericRecord.class.isAssignableFrom(type);
}
@Override
public TypeSerializer<T> duplicate() {
- if (schemaString != null) {
- return new AvroSerializer<>(type, schema);
- } else {
- return new AvroSerializer<>(type);
-
- }
+ checkAvroInitialized();
+ return new AvroSerializer<>(type, new
SerializableAvroSchema(runtimeSchema), previousSchema);
}
@Override
@@ -359,32 +355,26 @@ private void checkAvroInitialized() {
}
private void initializeAvro() {
- final ClassLoader cl =
Thread.currentThread().getContextClassLoader();
-
- if (SpecificRecord.class.isAssignableFrom(type)) {
- SpecificData specificData = new SpecificData(cl);
- this.avroData = specificData;
- this.schema = specificData.getSchema(type);
- this.reader = new SpecificDatumReader<>(schema, schema,
specificData);
- this.writer = new SpecificDatumWriter<>(schema,
specificData);
- } else if (GenericRecord.class.isAssignableFrom(type)) {
- if (schema == null) {
- this.schema = new
Schema.Parser().parse(schemaString);
- }
- GenericData genericData = new GenericData(cl);
- this.avroData = genericData;
- this.reader = new GenericDatumReader<>(schema, schema,
genericData);
- this.writer = new GenericDatumWriter<>(schema,
genericData);
- } else {
- final ReflectData reflectData = new ReflectData(cl);
- this.avroData = reflectData;
- this.schema = reflectData.getSchema(type);
- this.reader = new ReflectDatumReader<>(schema, schema,
reflectData);
- this.writer = new ReflectDatumWriter<>(schema,
reflectData);
- }
-
- this.encoder = new DataOutputEncoder();
- this.decoder = new DataInputDecoder();
+ final AvroFactory<T> factory;
+ if (wasThisInstanceDeserializedFromAPre17Version()) {
+ // since schema is a final field that is initialized to
a non null value,
+ // this can only have happened when restoring from a
checkpoint in an FsStateBackend pre Flink 1.7.
+ // To maintain backwards compatibility we need to use
the information stored at schemaString.
+ factory =
AvroFactory.createFromTypeAndSchemaString(type, schemaString);
+ }
+ else {
+ factory = AvroFactory.create(type,
schema.getAvroSchema(), previousSchema.getAvroSchema());
+ }
+ this.runtimeSchema = factory.getSchema();
+ this.writer = factory.getWriter();
+ this.reader = factory.getReader();
+ this.encoder = factory.getEncoder();
+ this.decoder = factory.getDecoder();
+ this.avroData = factory.getAvroData();
+ }
+
+ private boolean wasThisInstanceDeserializedFromAPre17Version() {
+ return (schema == null);
}
//
--------------------------------------------------------------------------------------------
@@ -403,8 +393,8 @@ private void enterExclusiveThread() {
}
else if (previous != thisThread) {
throw new IllegalStateException(
- "Concurrent access to KryoSerializer.
Thread 1: " + thisThread.getName() +
- " , Thread 2: " +
previous.getName());
+ "Concurrent access to KryoSerializer. Thread 1:
" + thisThread.getName() +
+ " , Thread 2: " + previous.getName());
}
}
@@ -412,13 +402,20 @@ private void exitExclusiveThread() {
currentThread = null;
}
+ Schema getAvroSchema() {
+ checkAvroInitialized();
+ return runtimeSchema;
+ }
+
//
------------------------------------------------------------------------
// Serializer Snapshots
//
------------------------------------------------------------------------
/**
* A config snapshot for the Avro Serializer that stores the Avro
Schema to check compatibility.
+ * This class is now deprecated and only kept for backward
comparability.
*/
+ @Deprecated
public static final class AvroSchemaSerializerConfigSnapshot<T> extends
TypeSerializerConfigSnapshot<T> {
private String schemaString;
@@ -427,8 +424,13 @@ private void exitExclusiveThread() {
* Default constructor for instantiation via reflection.
*/
@SuppressWarnings("unused")
- public AvroSchemaSerializerConfigSnapshot() {}
+ public AvroSchemaSerializerConfigSnapshot() {
+ }
+ /**
+ * AvroSerializer now uses the new {@link
AvroSerializerSnapshot} class instead.
+ */
+ @SuppressWarnings("unused")
public AvroSchemaSerializerConfigSnapshot(String schemaString) {
this.schemaString = checkNotNull(schemaString);
}
@@ -485,47 +487,4 @@ public String toString() {
}
}
- /**
- * The outdated config snapshot, retained for backwards compatibility.
- *
- * @deprecated The {@link AvroSchemaSerializerConfigSnapshot} should be
used instead.
- */
- @Deprecated
- public static class AvroSerializerConfigSnapshot<T> extends
KryoRegistrationSerializerConfigSnapshot<T> {
-
- private static final int VERSION = 1;
-
- private Class<? extends T> typeToInstantiate;
-
- public AvroSerializerConfigSnapshot() {}
-
- @Override
- public void write(DataOutputView out) throws IOException {
- super.write(out);
-
- out.writeUTF(typeToInstantiate.getName());
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void read(DataInputView in) throws IOException {
- super.read(in);
-
- String classname = in.readUTF();
- try {
- typeToInstantiate = (Class<? extends T>)
Class.forName(classname, true, getUserCodeClassLoader());
- } catch (ClassNotFoundException e) {
- throw new IOException("Cannot find requested
class " + classname + " in classpath.", e);
- }
- }
-
- @Override
- public int getVersion() {
- return VERSION;
- }
-
- public Class<? extends T> getTypeToInstantiate() {
- return typeToInstantiate;
- }
- }
}
diff --git
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
new file mode 100644
index 00000000000..cb549ebd98e
--- /dev/null
+++
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static
org.apache.flink.formats.avro.typeutils.AvroSerializer.isGenericRecord;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An {@code Avro} specific implementation of a {@link TypeSerializerSnapshot}.
+ *
+ * @param <T> The data type that the originating serializer of this
configuration serializes.
+ */
+public final class AvroSerializerSnapshot<T> implements
TypeSerializerSnapshot<T> {
+ private Class<T> runtimeType;
+ private Schema schema;
+ private Schema runtimeSchema;
+
+ @SuppressWarnings("WeakerAccess")
+ public AvroSerializerSnapshot() {
+ // this constructor is used when restoring from a checkpoint.
+ }
+
+ AvroSerializerSnapshot(Schema schema, Class<T> runtimeType) {
+ this.schema = schema;
+ this.runtimeType = runtimeType;
+ }
+
+ @Override
+ public int getCurrentVersion() {
+ return 1;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ checkNotNull(runtimeType);
+ checkNotNull(schema);
+
+ out.writeUTF(runtimeType.getName());
+ out.writeUTF(schema.toString(false));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void read(int readVersion, DataInputView in, ClassLoader
userCodeClassLoader) throws IOException {
+ final String previousRuntimeTypeName = in.readUTF();
+ final String previousSchemaDefinition = in.readUTF();
+
+ this.runtimeType = findClassOrThrow(userCodeClassLoader,
previousRuntimeTypeName);
+ this.schema = parseAvroSchema(previousSchemaDefinition);
+ this.runtimeSchema = tryExtractAvroSchema(userCodeClassLoader,
runtimeType);
+ }
+
+ @Override
+ public <NS extends TypeSerializer<T>>
TypeSerializerSchemaCompatibility<T, NS>
+ resolveSchemaCompatibility(NS newSerializer) {
+ if (!(newSerializer instanceof AvroSerializer)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ AvroSerializer<?> newAvroSerializer = (AvroSerializer<?>)
newSerializer;
+ return resolveSchemaCompatibility(schema,
newAvroSerializer.getAvroSchema());
+ }
+
+ @Override
+ public TypeSerializer<T> restoreSerializer() {
+ checkNotNull(runtimeType);
+ checkNotNull(schema);
+
+ if (runtimeSchema != null) {
+ return new AvroSerializer<>(runtimeType, new
SerializableAvroSchema(runtimeSchema), new SerializableAvroSchema(schema));
+ }
+ else {
+ return new AvroSerializer<>(runtimeType, new
SerializableAvroSchema(schema), new SerializableAvroSchema(schema));
+ }
+ }
+
+ //
------------------------------------------------------------------------------------------------------------
+ // Helpers
+ //
------------------------------------------------------------------------------------------------------------
+
+ /**
+ * Resolves writer/reader schema compatibly.
+ *
+ * <p>Checks whenever a new version of a schema (reader) can read
values serialized with the old schema (writer).
+ * If the schemas are compatible according to {@code Avro} schema
resolution rules
+ * (@see <a
href="https://avro.apache.org/docs/current/spec.html#Schema+Resolution">Schema
Resolution</a>).
+ */
+ @VisibleForTesting
+ static <T, NS extends TypeSerializer<T>>
TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(
+ Schema writerSchema,
+ Schema readerSchema) {
+
+ if (Objects.equals(writerSchema, readerSchema)) {
+ return
TypeSerializerSchemaCompatibility.compatibleAsIs();
+ }
+
+ final SchemaPairCompatibility compatibility =
+
SchemaCompatibility.checkReaderWriterCompatibility(readerSchema, writerSchema);
+
+ return avroCompatibilityToFlinkCompatibility(compatibility);
+ }
+
+ private static <T, NS extends TypeSerializer<T>>
TypeSerializerSchemaCompatibility<T, NS>
+ avroCompatibilityToFlinkCompatibility(SchemaPairCompatibility
compatibility) {
+ switch (compatibility.getType()) {
+ case COMPATIBLE: {
+ // The new serializer would be able to read
data persisted with *this* serializer, therefore no migration
+ // is required.
+ return
TypeSerializerSchemaCompatibility.compatibleAsIs();
+ }
+ case INCOMPATIBLE: {
+ return
TypeSerializerSchemaCompatibility.incompatible();
+ }
+ case RECURSION_IN_PROGRESS:
+ default:
+ return
TypeSerializerSchemaCompatibility.incompatible();
+ }
+ }
+
+ private static Schema parseAvroSchema(String previousSchemaDefinition) {
+ Schema.Parser parser = new Schema.Parser();
+ return parser.parse(previousSchemaDefinition);
+ }
+
+ private static Schema tryExtractAvroSchema(ClassLoader cl, Class<?>
runtimeType) {
+ if (isGenericRecord(runtimeType)) {
+ return null;
+ }
+ if (isSpecificRecord(runtimeType)) {
+ SpecificData d = new SpecificData(cl);
+ return d.getSchema(runtimeType);
+ }
+ ReflectData d = new ReflectData(cl);
+ return d.getSchema(runtimeType);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> Class<T> findClassOrThrow(ClassLoader
userCodeClassLoader, String className) {
+ try {
+ Class<?> runtimeTarget = Class.forName(className,
false, userCodeClassLoader);
+ return (Class<T>) runtimeTarget;
+ }
+ catch (ClassNotFoundException e) {
+ throw new IllegalStateException(""
+ + "Unable to find the class '" + className + "'
which is used to deserialize "
+ + "the elements of this serializer. "
+ + "Were the class was moved or renamed?", e);
+ }
+ }
+
+ private static boolean isSpecificRecord(Class<?> runtimeType) {
+ return SpecificRecord.class.isAssignableFrom(runtimeType);
+ }
+}
diff --git
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
index 644ee50d361..09ce1868707 100644
---
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
+++
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
@@ -33,7 +33,6 @@
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
/**
* Special type information to generate a special AvroTypeInfo for Avro POJOs
(implementing SpecificRecordBase, the typed Avro POJOs)
@@ -50,52 +49,22 @@
private static final long serialVersionUID = 1L;
- private static final ConcurrentHashMap<Thread, Boolean>
IN_BACKWARDS_COMPATIBLE_MODE = new ConcurrentHashMap<>();
-
- private final boolean useBackwardsCompatibleSerializer;
-
/**
* Creates a new Avro type info for the given class.
*/
public AvroTypeInfo(Class<T> typeClass) {
- this(typeClass, false);
- }
-
- /**
- * Creates a new Avro type info for the given class.
- *
- * <p>This constructor takes a flag to specify whether a serializer
- * that is backwards compatible with PoJo-style serialization of Avro
types should be used.
- * That is only necessary, if one has a Flink 1.3 (or earlier)
savepoint where Avro types
- * were stored in the checkpointed state. New Flink programs will never
need this.
- */
- public AvroTypeInfo(Class<T> typeClass, boolean
useBackwardsCompatibleSerializer) {
- super(typeClass, generateFieldsFromAvroSchema(typeClass,
useBackwardsCompatibleSerializer));
-
- final Boolean modeOnStack =
IN_BACKWARDS_COMPATIBLE_MODE.get(Thread.currentThread());
- this.useBackwardsCompatibleSerializer = modeOnStack == null ?
- useBackwardsCompatibleSerializer : modeOnStack;
+ super(typeClass, generateFieldsFromAvroSchema(typeClass));
}
@Override
@SuppressWarnings("deprecation")
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
- return useBackwardsCompatibleSerializer ?
- new
BackwardsCompatibleAvroSerializer<>(getTypeClass()) :
- new AvroSerializer<>(getTypeClass());
+ return new AvroSerializer<>(getTypeClass());
}
@SuppressWarnings("unchecked")
@Internal
- public static <T extends SpecificRecordBase> List<PojoField>
generateFieldsFromAvroSchema(
- Class<T> typeClass,
- boolean useBackwardsCompatibleSerializer) {
-
- final Thread currentThread = Thread.currentThread();
- final boolean entryPoint =
-
IN_BACKWARDS_COMPATIBLE_MODE.putIfAbsent(currentThread,
useBackwardsCompatibleSerializer) == null;
-
- try {
+ private static <T extends SpecificRecordBase> List<PojoField>
generateFieldsFromAvroSchema(Class<T> typeClass) {
PojoTypeExtractor pte = new PojoTypeExtractor();
ArrayList<Type> typeHierarchy = new ArrayList<>();
typeHierarchy.add(typeClass);
@@ -121,12 +90,6 @@ public AvroTypeInfo(Class<T> typeClass, boolean
useBackwardsCompatibleSerializer
newFields.add(newField);
}
return newFields;
- }
- finally {
- if (entryPoint) {
-
IN_BACKWARDS_COMPATIBLE_MODE.remove(currentThread);
- }
- }
}
private static class PojoTypeExtractor extends TypeExtractor {
diff --git
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
deleted file mode 100644
index 63b79b2d064..00000000000
---
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.avro.typeutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
-import
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot;
-import
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot;
-
-import org.apache.avro.specific.SpecificRecordBase;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * An Avro serializer that can switch back to a KryoSerializer or a Pojo
Serializer, if
- * it has to ensure compatibility with one of those.
- *
- * <p>This serializer is there only as a means to explicitly fall back to PoJo
serialization
- * in the case where an upgrade from an earlier savepoint was made.
- *
- * @param <T> The type to be serialized.
- */
-@SuppressWarnings("deprecation")
-public class BackwardsCompatibleAvroSerializer<T> extends TypeSerializer<T> {
-
- private static final long serialVersionUID = 1L;
-
- /** The type to serialize. */
- private final Class<T> type;
-
- /** The type serializer currently used. Avro by default. */
- private TypeSerializer<T> serializer;
-
- /**
- * Creates a new backwards-compatible Avro Serializer, for the given
type.
- */
- public BackwardsCompatibleAvroSerializer(Class<T> type) {
- this.type = type;
- this.serializer = new AvroSerializer<>(type);
- }
-
- /**
- * Private copy constructor.
- */
- private BackwardsCompatibleAvroSerializer(Class<T> type,
TypeSerializer<T> serializer) {
- this.type = type;
- this.serializer = serializer;
- }
-
- //
------------------------------------------------------------------------
- // Properties
- //
------------------------------------------------------------------------
-
- @Override
- public boolean isImmutableType() {
- return serializer.isImmutableType();
- }
-
- @Override
- public int getLength() {
- return serializer.getLength();
- }
-
- //
------------------------------------------------------------------------
- // Serialization
- //
------------------------------------------------------------------------
-
- @Override
- public T createInstance() {
- return serializer.createInstance();
- }
-
- @Override
- public void serialize(T value, DataOutputView target) throws
IOException {
- serializer.serialize(value, target);
- }
-
- @Override
- public T deserialize(DataInputView source) throws IOException {
- return serializer.deserialize(source);
- }
-
- @Override
- public T deserialize(T reuse, DataInputView source) throws IOException {
- return serializer.deserialize(reuse, source);
- }
-
- //
------------------------------------------------------------------------
- // Copying
- //
------------------------------------------------------------------------
-
- @Override
- public T copy(T from) {
- return serializer.copy(from);
- }
-
- @Override
- public T copy(T from, T reuse) {
- return serializer.copy(from, reuse);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws
IOException {
- serializer.copy(source, target);
- }
-
- //
------------------------------------------------------------------------
- // Utilities
- //
------------------------------------------------------------------------
-
- @Override
- public TypeSerializer<T> duplicate() {
- return new BackwardsCompatibleAvroSerializer<>(type,
serializer.duplicate());
- }
-
- @Override
- public int hashCode() {
- return type.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- else if (obj != null && obj.getClass() ==
BackwardsCompatibleAvroSerializer.class) {
- final BackwardsCompatibleAvroSerializer that =
(BackwardsCompatibleAvroSerializer) obj;
- return this.type == that.type &&
this.serializer.equals(that.serializer);
- }
- else {
- return false;
- }
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj.getClass() == this.getClass();
- }
-
- @Override
- public String toString() {
- return getClass().getName() + " (" + type.getName() + ')';
- }
-
- //
------------------------------------------------------------------------
- // Configuration Snapshots and Upgrades
- //
------------------------------------------------------------------------
-
- @Override
- public TypeSerializerSnapshot<T> snapshotConfiguration() {
- // we return the configuration of the actually used serializer
here
- return serializer.snapshotConfiguration();
- }
-
- @Override
- public CompatibilityResult<T>
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
- if (configSnapshot instanceof
AvroSchemaSerializerConfigSnapshot ||
- configSnapshot instanceof
AvroSerializerConfigSnapshot) {
-
- // avro serializer, nice :-)
- checkState(serializer instanceof AvroSerializer,
- "Serializer was changed backwards to
PojoSerializer and now encounters AvroSerializer snapshot.");
-
- return serializer.ensureCompatibility(configSnapshot);
- }
- else if (configSnapshot instanceof
PojoSerializerConfigSnapshot) {
- // common previous case
-
checkState(SpecificRecordBase.class.isAssignableFrom(type),
- "BackwardsCompatibleAvroSerializer
resuming a state serialized " +
- "via a PojoSerializer,
but not for an Avro Specific Record");
-
- final AvroTypeInfo<? extends SpecificRecordBase>
typeInfo =
- new
AvroTypeInfo<>(type.asSubclass(SpecificRecordBase.class), true);
-
- @SuppressWarnings("unchecked")
- final TypeSerializer<T> pojoSerializer =
- (TypeSerializer<T>)
typeInfo.createPojoSerializer(new ExecutionConfig());
- this.serializer = pojoSerializer;
- return serializer.ensureCompatibility(configSnapshot);
- }
- else if (configSnapshot instanceof
KryoRegistrationSerializerConfigSnapshot) {
- // force-kryo old case common previous case
- // we create a new Kryo Serializer with a blank
execution config.
- // registrations are anyways picked up from the
snapshot.
- serializer = new KryoSerializer<>(type, new
ExecutionConfig());
- return serializer.ensureCompatibility(configSnapshot);
- }
- else {
- // completely incompatible type, needs migration
- return CompatibilityResult.requiresMigration();
- }
- }
-}
diff --git
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java
new file mode 100644
index 00000000000..fb7114489d6
--- /dev/null
+++
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Parser;
+import org.apache.avro.reflect.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * A wrapper for Avro {@link Schema}, that is Java serializable.
+ */
+@Internal
+final class SerializableAvroSchema implements Serializable {
+
+ private static final long serialVersionUID = 1;
+
+ private transient @Nullable Schema schema;
+
+ SerializableAvroSchema() {
+ }
+
+ SerializableAvroSchema(Schema schema) {
+ this.schema = schema;
+ }
+
+ Schema getAvroSchema() {
+ return schema;
+ }
+
+ private void writeObject(ObjectOutputStream oos) throws IOException {
+ if (schema == null) {
+ oos.writeBoolean(false);
+ }
+ else {
+ oos.writeBoolean(true);
+ oos.writeUTF(schema.toString(false));
+ }
+ }
+
+ private void readObject(ObjectInputStream ois) throws
ClassNotFoundException, IOException {
+ if (ois.readBoolean()) {
+ String schema = ois.readUTF();
+ this.schema = new Parser().parse(schema);
+ }
+ else {
+ this.schema = null;
+ }
+ }
+}
diff --git
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
new file mode 100644
index 00000000000..a6d98dccdc4
--- /dev/null
+++
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.utils.TestDataGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericRecord;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.function.Function;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test {@link AvroSerializerSnapshot}.
+ */
+public class AvroSerializerSnapshotTest {
+
+ private static final Schema FIRST_NAME = SchemaBuilder.record("name")
+ .namespace("org.apache.flink")
+ .fields()
+ .requiredString("first")
+ .endRecord();
+
+ private static final Schema FIRST_REQUIRED_LAST_OPTIONAL =
SchemaBuilder.record("name")
+ .namespace("org.apache.flink")
+ .fields()
+ .requiredString("first")
+ .optionalString("last")
+ .endRecord();
+
+ private static final Schema BOTH_REQUIRED = SchemaBuilder.record("name")
+ .namespace("org.apache.flink")
+ .fields()
+ .requiredString("first")
+ .requiredString("last")
+ .endRecord();
+
+ @Test
+ public void sameSchemaShouldBeCompatibleAsIs() {
+
assertThat(AvroSerializerSnapshot.resolveSchemaCompatibility(FIRST_NAME,
FIRST_NAME), isCompatibleAsIs());
+ }
+
+ @Test
+ public void removingAnOptionalFieldsIsCompatibleAsIs() {
+
assertThat(AvroSerializerSnapshot.resolveSchemaCompatibility(FIRST_REQUIRED_LAST_OPTIONAL,
FIRST_NAME),
+ isCompatibleAsIs());
+ }
+
+ @Test
+ public void addingAnOptionalFieldsIsCompatibleAsIs() {
+
assertThat(AvroSerializerSnapshot.resolveSchemaCompatibility(FIRST_NAME,
FIRST_REQUIRED_LAST_OPTIONAL),
+ isCompatibleAsIs());
+ }
+
+ @Test
+ public void addingARequiredMakesSerializersIncompatible() {
+
assertThat(AvroSerializerSnapshot.resolveSchemaCompatibility(FIRST_REQUIRED_LAST_OPTIONAL,
BOTH_REQUIRED),
+ isIncompatible());
+ }
+
+ @Test
+ public void anAvroSnapshotIsCompatibleWithItsOriginatingSerializer() {
+ AvroSerializer<GenericRecord> serializer =
+ new AvroSerializer<>(GenericRecord.class,
FIRST_REQUIRED_LAST_OPTIONAL);
+
+ TypeSerializerSnapshot<GenericRecord> snapshot =
serializer.snapshotConfiguration();
+
+ assertThat(snapshot.resolveSchemaCompatibility(serializer),
isCompatibleAsIs());
+ }
+
+ @Test
+ public void anAvroSnapshotIsCompatibleAfterARoundTrip() throws
IOException {
+ AvroSerializer<GenericRecord> serializer =
+ new AvroSerializer<>(GenericRecord.class,
FIRST_REQUIRED_LAST_OPTIONAL);
+
+ AvroSerializerSnapshot<GenericRecord> restored =
roundTrip(serializer.snapshotConfiguration());
+
+ assertThat(restored.resolveSchemaCompatibility(serializer),
isCompatibleAsIs());
+ }
+
+ @Test
+ public void anAvroSpecificRecordIsCompatibleAfterARoundTrip() throws
IOException {
+ // user is an avro generated test object.
+ AvroSerializer<User> serializer = new
AvroSerializer<>(User.class);
+
+ AvroSerializerSnapshot<User> restored =
roundTrip(serializer.snapshotConfiguration());
+
+ assertThat(restored.resolveSchemaCompatibility(serializer),
isCompatibleAsIs());
+ }
+
+ @Test
+ public void aPojoIsCompatibleAfterARoundTrip() throws IOException {
+ AvroSerializer<Pojo> serializer = new
AvroSerializer<>(Pojo.class);
+
+ AvroSerializerSnapshot<Pojo> restored =
roundTrip(serializer.snapshotConfiguration());
+
+ assertThat(restored.resolveSchemaCompatibility(serializer),
isCompatibleAsIs());
+ }
+
+ @Test
+ public void
recordSerializedShouldBeDeserializeWithTheResortedSerializer() throws
IOException {
+ // user is an avro generated test object.
+ final User user = TestDataGenerator.generateRandomUser(new
Random());
+ final AvroSerializer<User> originalSerializer = new
AvroSerializer<>(User.class);
+ //
+ // first serialize the record
+ //
+ ByteBuffer serializedUser = serialize(originalSerializer, user);
+ //
+ // then restore a serializer from the snapshot
+ //
+ TypeSerializer<User> restoredSerializer =
originalSerializer.snapshotConfiguration().restoreSerializer();
+ //
+ // now deserialize the user with the resorted serializer.
+ //
+ User restoredUser = deserialize(restoredSerializer,
serializedUser);
+
+ assertThat(user, is(restoredUser));
+ }
+
+ @Test
+ public void validSchemaEvaluationShouldResultInCompatibleSerializers() {
+ final AvroSerializer<GenericRecord> originalSerializer = new
AvroSerializer<>(GenericRecord.class, FIRST_NAME);
+ final AvroSerializer<GenericRecord> newSerializer = new
AvroSerializer<>(GenericRecord.class, FIRST_REQUIRED_LAST_OPTIONAL);
+
+ TypeSerializerSnapshot<GenericRecord> originalSnapshot =
originalSerializer.snapshotConfiguration();
+
+
assertThat(originalSnapshot.resolveSchemaCompatibility(newSerializer),
isCompatibleAsIs());
+ }
+
+ @Test
+ public void
nonValidSchemaEvaluationShouldResultInCompatibleSerializers() {
+ final AvroSerializer<GenericRecord> originalSerializer = new
AvroSerializer<>(GenericRecord.class, FIRST_REQUIRED_LAST_OPTIONAL);
+ final AvroSerializer<GenericRecord> newSerializer = new
AvroSerializer<>(GenericRecord.class, BOTH_REQUIRED);
+
+ TypeSerializerSnapshot<GenericRecord> originalSnapshot =
originalSerializer.snapshotConfiguration();
+
+
assertThat(originalSnapshot.resolveSchemaCompatibility(newSerializer),
isIncompatible());
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void
changingFromGenericToSpecificWithCompatibleSchemaShouldResultInCompatibleSerializers()
{
+ // starting with a generic serializer
+ AvroSerializer<Object> generic = new
AvroSerializer(GenericRecord.class, User.SCHEMA$);
+ TypeSerializerSnapshot<Object> genericSnapshot =
generic.snapshotConfiguration();
+
+ // then upgrading to a specific serializer
+ AvroSerializer<Object> specificSerializer = new
AvroSerializer(User.class);
+ specificSerializer.snapshotConfiguration();
+
+
assertThat(genericSnapshot.resolveSchemaCompatibility(specificSerializer),
isCompatibleAsIs());
+ }
+
+ //
---------------------------------------------------------------------------------------------------------------
+ // Matchers
+ //
---------------------------------------------------------------------------------------------------------------
+
+ private Matcher<TypeSerializerSchemaCompatibility> isCompatibleAsIs() {
+ return
matcher(TypeSerializerSchemaCompatibility::isCompatibleAsIs, "compatible as
is");
+ }
+
+ private Matcher<TypeSerializerSchemaCompatibility>
isCompatibleAfterMigration() {
+ return
matcher(TypeSerializerSchemaCompatibility::isCompatibleAfterMigration,
+ "compatible after migration");
+ }
+
+ private Matcher<TypeSerializerSchemaCompatibility> isIncompatible() {
+ return
matcher(TypeSerializerSchemaCompatibility::isIncompatible,
+ "incompatible");
+ }
+
+ private static <T> Matcher<T> matcher(Function<T, Boolean> predicate,
String message) {
+ return new TypeSafeDiagnosingMatcher<T>() {
+
+ @Override
+ protected boolean matchesSafely(T item, Description
mismatchDescription) {
+ if (predicate.apply(item)) {
+ return true;
+ }
+ mismatchDescription.appendText("not
").appendText(message);
+ return false;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ }
+ };
+ }
+
+ //
---------------------------------------------------------------------------------------------------------------
+ // Utils
+ //
---------------------------------------------------------------------------------------------------------------
+
+ /**
+ * Serialize an (avro)TypeSerializerSnapshot and deserialize it.
+ */
+ private static <T> AvroSerializerSnapshot<T>
roundTrip(TypeSerializerSnapshot<T> original) throws IOException {
+ // write
+ DataOutputSerializer out = new DataOutputSerializer(1024);
+ original.write(out);
+
+ // init
+ AvroSerializerSnapshot<T> restored = new
AvroSerializerSnapshot<>();
+
+ // read
+ DataInputView in = new
DataInputDeserializer(out.wrapAsByteBuffer());
+ restored.read(restored.getCurrentVersion(), in,
original.getClass().getClassLoader());
+
+ return restored;
+ }
+
+ private static <T> ByteBuffer serialize(TypeSerializer<T> serializer, T
record) throws IOException {
+ DataOutputSerializer out = new DataOutputSerializer(1024);
+ serializer.serialize(record, out);
+ return out.wrapAsByteBuffer();
+ }
+
+ private static <T> T deserialize(TypeSerializer<T> serializer,
ByteBuffer serializedRecord) throws IOException {
+ DataInputView in = new DataInputDeserializer(serializedRecord);
+ return serializer.deserialize(in);
+ }
+
+ //
---------------------------------------------------------------------------------------------------------------
+ // Test classes
+ //
---------------------------------------------------------------------------------------------------------------
+
+ private static class Pojo {
+ private String foo;
+
+ public String getFoo() {
+ return foo;
+ }
+
+ public void setFoo(String foo) {
+ this.foo = foo;
+ }
+ }
+}
diff --git
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
deleted file mode 100644
index 7b8763bfa2f..00000000000
---
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.avro.typeutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.formats.avro.generated.SimpleUser;
-import org.apache.flink.formats.avro.utils.TestDataGenerator;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * This test ensures that state and state configuration created by Flink 1.3
Avro types
- * that used the PojoSerializer still works (in most cases, see notice below).
- *
- * <p><b>Important:</b> Since Avro itself broke class compatibility between
1.7.7 (used in Flink 1.3)
- * and 1.8.2 (used in Flink 1.4), the Avro by Pojo compatibility is broken
through Avro already.
- * This test only tests that the Avro serializer change (switching from Pojo
to Avro for Avro types)
- * works properly.
- *
- * <p>This test can be dropped once we drop backwards compatibility with Flink
1.3 snapshots.
- *
- * <p>The {@link BackwardsCompatibleAvroSerializer} does not support custom
Kryo registrations (which
- * logical types require for Avro 1.8 because Kryo does not support
Joda-Time). We introduced a
- * simpler user record for pre-Avro 1.8 test cases.
- */
-public class BackwardsCompatibleAvroSerializerTest {
-
- private static final String SNAPSHOT_RESOURCE =
"flink-1.6-avro-type-serializer-snapshot";
-
- private static final String DATA_RESOURCE =
"flink-1.6-avro-type-serialized-data";
-
- @SuppressWarnings("unused")
- private static final String SNAPSHOT_RESOURCE_WRITER =
"/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" +
SNAPSHOT_RESOURCE;
-
- @SuppressWarnings("unused")
- private static final String DATA_RESOURCE_WRITER =
"/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" +
DATA_RESOURCE;
-
- private static final long RANDOM_SEED = 143065108437678L;
-
- private static final int NUM_DATA_ENTRIES = 20;
-
- @Test
- public void testCompatibilityWithPojoSerializer() throws Exception {
-
- // retrieve the old config snapshot
-
- final TypeSerializer<SimpleUser> serializer;
- final TypeSerializerSnapshot<SimpleUser> configSnapshot;
-
- try (InputStream in =
getClass().getClassLoader().getResourceAsStream(SNAPSHOT_RESOURCE)) {
- DataInputViewStreamWrapper inView = new
DataInputViewStreamWrapper(in);
-
- List<Tuple2<TypeSerializer<?>,
TypeSerializerSnapshot<?>>> deserialized =
-
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(
- inView,
getClass().getClassLoader());
-
- assertEquals(1, deserialized.size());
-
- @SuppressWarnings("unchecked")
- final TypeSerializer<SimpleUser> typedSerializer =
(TypeSerializer<SimpleUser>) deserialized.get(0).f0;
-
- serializer = typedSerializer;
- configSnapshot = (TypeSerializerSnapshot<SimpleUser>)
deserialized.get(0).f1;
- }
-
- assertNotNull(serializer);
- assertNotNull(configSnapshot);
-
- assertTrue(serializer instanceof PojoSerializer);
- assertTrue(configSnapshot instanceof
PojoSerializer.PojoSerializerConfigSnapshot);
-
- // sanity check for the test: check that the test data works
with the original serializer
- validateDeserialization(serializer);
-
- // sanity check for the test: check that a PoJoSerializer and
the original serializer work together
-
assertFalse(serializer.ensureCompatibility(configSnapshot).isRequiresMigration());
-
- final TypeSerializer<SimpleUser> newSerializer = new
AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
-
assertFalse(newSerializer.ensureCompatibility(configSnapshot).isRequiresMigration());
-
- // deserialize the data and make sure this still works
- validateDeserialization(newSerializer);
-
- TypeSerializerSnapshot<SimpleUser> nextSnapshot =
newSerializer.snapshotConfiguration();
- final TypeSerializer<SimpleUser> nextSerializer = new
AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
-
-
assertFalse(nextSerializer.ensureCompatibility(nextSnapshot).isRequiresMigration());
-
- // deserialize the data and make sure this still works
- validateDeserialization(nextSerializer);
- }
-
- private static void validateDeserialization(TypeSerializer<SimpleUser>
serializer) throws IOException {
- final Random rnd = new Random(RANDOM_SEED);
-
- try (InputStream in =
BackwardsCompatibleAvroSerializerTest.class.getClassLoader()
- .getResourceAsStream(DATA_RESOURCE)) {
-
- final DataInputViewStreamWrapper inView = new
DataInputViewStreamWrapper(in);
-
- for (int i = 0; i < NUM_DATA_ENTRIES; i++) {
- final SimpleUser deserialized =
serializer.deserialize(inView);
-
- // deterministically generate a reference record
- final SimpleUser reference =
TestDataGenerator.generateRandomSimpleUser(rnd);
-
- assertEquals(reference, deserialized);
- }
- }
- }
-
-// run this code to generate the test data
-// public static void main(String[] args) throws Exception {
-//
-// AvroTypeInfo<SimpleUser> typeInfo = new
AvroTypeInfo<>(SimpleUser.class);
-//
-// TypeSerializer<SimpleUser> serializer =
typeInfo.createPojoSerializer(new ExecutionConfig());
-// TypeSerializerConfigSnapshot confSnapshot =
serializer.snapshotConfiguration();
-//
-// try (FileOutputStream fos = new
FileOutputStream(SNAPSHOT_RESOURCE_WRITER)) {
-// DataOutputViewStreamWrapper out = new
DataOutputViewStreamWrapper(fos);
-//
-//
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
-// out,
-// Collections.singletonList(
-// new
Tuple2<>(serializer, confSnapshot)));
-// }
-//
-// try (FileOutputStream fos = new
FileOutputStream(DATA_RESOURCE_WRITER)) {
-// final DataOutputViewStreamWrapper out = new
DataOutputViewStreamWrapper(fos);
-// final Random rnd = new Random(RANDOM_SEED);
-//
-// for (int i = 0; i < NUM_DATA_ENTRIES; i++) {
-//
serializer.serialize(TestDataGenerator.generateRandomSimpleUser(rnd), out);
-// }
-// }
-// }
-}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Upgrade AvroSerializer snapshot to implement new TypeSerializerSnapshot
> interface
> ---------------------------------------------------------------------------------
>
> Key: FLINK-10605
> URL: https://issues.apache.org/jira/browse/FLINK-10605
> Project: Flink
> Issue Type: Sub-task
> Components: Type Serialization System
> Reporter: Igal Shilman
> Assignee: Igal Shilman
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.7.0
>
>
> This issue introduces a new AvroSerializerSnapshot implementation that
> conforms to the new TypeSerializerSnapshot API.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)