This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 479ebd5  [FLINK-11436] [avro] Manually Java-deserialize AvroSerializer 
for backwards compatibility
479ebd5 is described below

commit 479ebd59872160cb2060605b08dcd0d86c3cb78e
Author: Igal Shilman <[email protected]>
AuthorDate: Sun Jan 27 08:35:52 2019 +0100

    [FLINK-11436] [avro] Manually Java-deserialize AvroSerializer for backwards 
compatibility
    
    During the release of Flink 1.7, the value of serialVersionUID was uptick 
to 2L (was 1L before)
    And although the AvroSerializer (along with it's snapshot class) were 
migrated to the new serialization
    abstraction (hence free from Java serialization), there were composite 
serializers that were not migrated
    and were serialized with Java serialization.
    This commit manually Java-Deserializes the AvroSerializer to support 
backwards compatability.
    
    This closes #7580.
---
 .../org/apache/flink/util/InstantiationUtil.java   |  11 +-
 .../flink/formats/avro/typeutils/AvroFactory.java  |   6 +-
 .../formats/avro/typeutils/AvroSerializer.java     | 106 +++++++++++-
 .../typeutils/AvroSerializerMigrationTest.java     | 188 +++++++++++++++++++++
 4 files changed, 297 insertions(+), 14 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index eab8f4c..f4600a2 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -210,11 +210,12 @@ public final class InstantiationUtil {
 
                        final Class localClass = 
resolveClass(streamClassDescriptor);
                        final String name = localClass.getName();
-                       if (scalaSerializerClassnames.contains(name) || 
scalaTypes.contains(name) || isAnonymousClass(localClass)) {
+                       if (scalaSerializerClassnames.contains(name) || 
scalaTypes.contains(name) || isAnonymousClass(localClass)
+                               || isOldAvroSerializer(name, 
streamClassDescriptor.getSerialVersionUID())) {
                                final ObjectStreamClass localClassDescriptor = 
ObjectStreamClass.lookup(localClass);
                                if (localClassDescriptor != null
                                        && 
localClassDescriptor.getSerialVersionUID() != 
streamClassDescriptor.getSerialVersionUID()) {
-                                       LOG.warn("Ignoring serialVersionUID 
mismatch for anonymous class {}; was {}, now {}.",
+                                       LOG.warn("Ignoring serialVersionUID 
mismatch for class {}; was {}, now {}.",
                                                
streamClassDescriptor.getName(), streamClassDescriptor.getSerialVersionUID(), 
localClassDescriptor.getSerialVersionUID());
 
                                        streamClassDescriptor = 
localClassDescriptor;
@@ -223,6 +224,7 @@ public final class InstantiationUtil {
 
                        return streamClassDescriptor;
                }
+
        }
 
        private static boolean isAnonymousClass(Class clazz) {
@@ -242,6 +244,11 @@ public final class InstantiationUtil {
                }
        }
 
+       private static boolean isOldAvroSerializer(String name, long 
serialVersionUID) {
+               // please see FLINK-11436 for details on why we need to ignore 
serial version UID here for the AvroSerializer
+               return (serialVersionUID == 1) && 
"org.apache.flink.formats.avro.typeutils.AvroSerializer".equals(name);
+       }
+
        /**
         * A mapping between the full path of a deprecated serializer and its 
equivalent.
         * These mappings are hardcoded and fixed.
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
index 0ca25bf..9a8bdcb 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
@@ -86,9 +86,9 @@ final class AvroFactory<T> {
                return fromReflective(type, cl, 
Optional.ofNullable(previousSchema));
        }
 
-       static <T> AvroFactory<T> createFromTypeAndSchemaString(Class<T> type, 
@Nullable String schemaString) {
-               Schema schema = (schemaString != null) ? new 
Schema.Parser().parse(schemaString) : null;
-               return create(type, schema, null);
+       @Nullable
+       static Schema parseSchemaString(@Nullable String schemaString) {
+               return (schemaString == null) ? null : new 
Schema.Parser().parse(schemaString);
        }
 
        @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
index 2be660a..da51117 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -32,12 +32,15 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.Nullable;
 import org.apache.avro.specific.SpecificRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import java.io.IOException;
+import java.io.ObjectInputStream;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -75,11 +78,9 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 
        // -------- configuration fields, serializable -----------
 
-       /** The class of the type that is serialized by this serializer.
-        */
-       private final Class<T> type;
-       private final SerializableAvroSchema schema;
-       private final SerializableAvroSchema previousSchema;
+       @Nonnull private Class<T> type;
+       @Nonnull private SerializableAvroSchema schema;
+       @Nonnull private SerializableAvroSchema previousSchema;
 
        // -------- runtime fields, non-serializable, lazily initialized 
-----------
 
@@ -127,10 +128,10 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
         * Creates a new AvroSerializer for the type indicated by the given 
class.
         */
        @Internal
-       AvroSerializer(Class<T> type, @Nullable SerializableAvroSchema 
newSchema, @Nullable SerializableAvroSchema previousSchema) {
+       AvroSerializer(Class<T> type, SerializableAvroSchema newSchema, 
SerializableAvroSchema previousSchema) {
                this.type = checkNotNull(type);
-               this.schema = newSchema;
-               this.previousSchema = previousSchema;
+               this.schema = checkNotNull(newSchema);
+               this.previousSchema = checkNotNull(previousSchema);
        }
 
        /**
@@ -144,6 +145,7 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 
        // 
------------------------------------------------------------------------
 
+       @Nonnull
        public Class<T> getType() {
                return type;
        }
@@ -381,4 +383,90 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
                }
 
        }
+
+       // -------- backwards compatibility with 1.5, 1.6 -----------
+
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               /*
+               Please see FLINK-11436 for details on why manual 
deserialization is required.
+
+               During the release of Flink 1.7, the value of serialVersionUID 
was uptick to 2L (was 1L before)
+               And although the AvroSerializer (along with it's snapshot 
class) were migrated to the new serialization
+               abstraction (hence free from Java serialization), there were 
composite serializers that were not migrated
+               and were serialized with Java serialization. In case that one 
of the nested serializers were Avro we would
+               bump into deserialization exception due to a wrong 
serialVersionUID. Unfortunately it is not possible to revert
+               the serialVersionUID back to 1L, because users might have 
snapshots with 2L present already.
+               To overcome this we first need to make sure that the 
AvroSerializer is being Java deserialized with
+               FailureTolerantObjectInputStream, and then we determine the 
serialized layout by looking at the fields.
+
+               From: 
https://docs.oracle.com/javase/8/docs/platform/serialization/spec/class.html#a5421
+               
-------------------------------------------------------------------------------------------------------------
+               The descriptors for primitive typed fields are written first
+               sorted by field name followed by descriptors for the object 
typed fields sorted by field name.
+               The names are sorted using String.compareTo.
+               
-------------------------------------------------------------------------------------------------------------
+
+               pre 1.6         field order:    [type]
+               pre 1.7         field order:    [schemaString,          type]
+               post 1.7        field order:    [previousSchema,        schema, 
        type]
+
+               We would use the first field to distinguish between the three 
different layouts.
+               To complicate things even further in pre 1.7, the field 
@schemaString could be
+               null or a string, but, in post 1.7, the field @previousSchema 
was never set to null, therefore
+               we can use the first field to determine the version.
+
+               this logic should stay here as long as we support Flink 1.6 
(along with Java serialized
+               TypeSerializers)
+               */
+               final Object firstField = in.readObject();
+
+               if (firstField == null) {
+                       // first field can only be NULL in 1.6 (schemaString)
+                       read16Layout(null, in);
+               }
+               else if (firstField instanceof String) {
+                       // first field is a String only in 1.6 (schemaString)
+                       read16Layout((String) firstField, in);
+               }
+               else if (firstField instanceof Class<?>) {
+                       // first field is a Class<?> only in 1.5 (type)
+                       @SuppressWarnings("unchecked") Class<T> type = 
(Class<T>) firstField;
+                       read15Layout(type);
+               }
+               else if (firstField instanceof SerializableAvroSchema) {
+                       readCurrentLayout((SerializableAvroSchema) firstField, 
in);
+               }
+               else {
+                       throw new IllegalStateException("Failed to 
Java-Deserialize an AvroSerializer instance. " +
+                               "Was expecting a first field to be either a 
String or SerializableAvroSchema, but got: " +
+                               "" + firstField.getClass());
+               }
+       }
+
+       private void read15Layout(Class<T> type) {
+               this.previousSchema = new SerializableAvroSchema();
+               this.schema = new SerializableAvroSchema();
+               this.type = type;
+       }
+
+       @SuppressWarnings("unchecked")
+       private void read16Layout(@Nullable String schemaString, 
ObjectInputStream in)
+                       throws IOException, ClassNotFoundException {
+
+               Schema schema = AvroFactory.parseSchemaString(schemaString);
+               Class<T> type = (Class<T>) in.readObject();
+
+               this.previousSchema = new SerializableAvroSchema();
+               this.schema = new SerializableAvroSchema(schema);
+               this.type = type;
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readCurrentLayout(SerializableAvroSchema previousSchema, 
ObjectInputStream in)
+                       throws IOException, ClassNotFoundException {
+
+               this.previousSchema = previousSchema;
+               this.schema = (SerializableAvroSchema) in.readObject();
+               this.type = (Class<T>) in.readObject();
+       }
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
index 73b76a6..a162aa3 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
@@ -18,15 +18,34 @@
 
 package org.apache.flink.formats.avro.typeutils;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.testutils.migration.MigrationVersion;
 
 import org.apache.avro.generic.GenericRecord;
+import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.Arrays.asList;
+import static junit.framework.TestCase.assertSame;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 /**
  * Tests migrations for {@link AvroSerializerSnapshot}.
@@ -68,4 +87,173 @@ public class AvroSerializerMigrationTest extends 
TypeSerializerSnapshotMigration
                return testSpecifications.get();
        }
 
+       // 
---------------------------------------------------------------------------------------------------------------
+       // The following batch of tests are making sure that AvroSerializer 
class is able to be Java-Deserialized.
+       // see [FLINK-11436] for more information.
+
+       // Once we drop support for versions that carried snapshots with 
Java-Deserialized serializers we can drop this
+       // batch of tests.
+       // 
---------------------------------------------------------------------------------------------------------------
+
+       @Test
+       public void javaDeserializeFromFlink_1_5_ReflectiveRecord() throws 
IOException {
+               final String avroSerializerBase64 = 
"AAAAAQAAAQis7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n"
 +
+                       
"U2VyaWFsaXplcgAAAAAAAAABAgABTAAEdHlwZXQAEUxqYXZhL2xhbmcvQ2xhc3M7eHIANG9yZy5hcGFj\n"
 +
+                       
"aGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwdnIA\n"
 +
+                       
"Tm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJvU2VyaWFsaXplck1pZ3Jh\n"
 +
+                       "dGlvblRlc3QkU2ltcGxlUG9qbwAAAAAAAAAAAAAAeHA=";
+
+               TypeSerializer<?> serializer = 
javaDeserialize(avroSerializerBase64);
+               assertThat(serializer, instanceOf(AvroSerializer.class));
+
+               AvroSerializer<?> avroSerializer = (AvroSerializer<?>) 
javaDeserialize(avroSerializerBase64);
+               assertSame(avroSerializer.getType(), SimplePojo.class);
+               assertThat(avroSerializer.getAvroSchema(), notNullValue());
+       }
+
+       @Test
+       public void javaDeserializeFromFlink_1_5_SpecificRecord() throws 
IOException {
+               final String avroSerializerBase64 = 
"AAAAAQAAASOs7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n"
 +
+                       
"U2VyaWFsaXplcgAAAAAAAAABAgABTAAEdHlwZXQAEUxqYXZhL2xhbmcvQ2xhc3M7eHIANG9yZy5hcGFj\n"
 +
+                       
"aGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwdnIA\n"
 +
+                       
"L29yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLmdlbmVyYXRlZC5BZGRyZXNz7Paj+KjgQ2oMAAB4\n"
 +
+                       
"cgArb3JnLmFwYWNoZS5hdnJvLnNwZWNpZmljLlNwZWNpZmljUmVjb3JkQmFzZQKi+azGtzQdDAAAeHA=";
+
+               TypeSerializer<?> serializer = 
javaDeserialize(avroSerializerBase64);
+               assertThat(serializer, instanceOf(AvroSerializer.class));
+
+               AvroSerializer<?> avroSerializer = (AvroSerializer<?>) 
javaDeserialize(avroSerializerBase64);
+               assertSame(avroSerializer.getType(), Address.class);
+               assertThat(avroSerializer.getAvroSchema(), is(Address.SCHEMA$));
+       }
+
+       @Test
+       public void javaDeserializeFromFlink_1_6() throws IOException {
+               final String avroSerializer = 
"AAAAAQAAAUis7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n"
 +
+                       
"U2VyaWFsaXplcgAAAAAAAAABAgACTAAMc2NoZW1hU3RyaW5ndAASTGphdmEvbGFuZy9TdHJpbmc7TAAE\n"
 +
+                       
"dHlwZXQAEUxqYXZhL2xhbmcvQ2xhc3M7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBl\n"
 +
+                       
"dXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwcHZyAC9vcmcuYXBhY2hlLmZsaW5rLmZvcm1h\n"
 +
+                       
"dHMuYXZyby5nZW5lcmF0ZWQuQWRkcmVzc+z2o/io4ENqDAAAeHIAK29yZy5hcGFjaGUuYXZyby5zcGVj\n"
 +
+                       "aWZpYy5TcGVjaWZpY1JlY29yZEJhc2UCovmsxrc0HQwAAHhw";
+
+               TypeSerializer<?> avro = javaDeserialize(avroSerializer);
+
+               assertThat(avro, instanceOf(AvroSerializer.class));
+       }
+
+       @Test
+       public void javaDeserializeFromFlink_1_6_GenericRecord() throws 
IOException {
+               String avroSerializerBase64 = 
"AAAAAQAAAges7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n"
 +
+                       
"U2VyaWFsaXplcgAAAAAAAAABAgACTAAMc2NoZW1hU3RyaW5ndAASTGphdmEvbGFuZy9TdHJpbmc7TAAE\n"
 +
+                       
"dHlwZXQAEUxqYXZhL2xhbmcvQ2xhc3M7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBl\n"
 +
+                       
"dXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwdAEBeyJ0eXBlIjoicmVjb3JkIiwibmFtZSI6\n"
 +
+                       
"IkFkZHJlc3MiLCJuYW1lc3BhY2UiOiJvcmcuYXBhY2hlLmZsaW5rLmZvcm1hdHMuYXZyby5nZW5lcmF0\n"
 +
+                       
"ZWQiLCJmaWVsZHMiOlt7Im5hbWUiOiJudW0iLCJ0eXBlIjoiaW50In0seyJuYW1lIjoic3RyZWV0Iiwi\n"
 +
+                       
"dHlwZSI6InN0cmluZyJ9LHsibmFtZSI6ImNpdHkiLCJ0eXBlIjoic3RyaW5nIn0seyJuYW1lIjoic3Rh\n"
 +
+                       
"dGUiLCJ0eXBlIjoic3RyaW5nIn0seyJuYW1lIjoiemlwIiwidHlwZSI6InN0cmluZyJ9XX12cgAlb3Jn\n"
 +
+                       
"LmFwYWNoZS5hdnJvLmdlbmVyaWMuR2VuZXJpY1JlY29yZAAAAAAAAAAAAAAAeHA=";
+
+               TypeSerializer<?> serializer = 
javaDeserialize(avroSerializerBase64);
+
+               AvroSerializer<?> avroSerializer = (AvroSerializer<?>) 
serializer;
+               assertSame(avroSerializer.getType(), GenericRecord.class);
+               assertThat(avroSerializer.getAvroSchema(), notNullValue());
+       }
+
+       @Test
+       public void javaDeserializeFromFlink_1_7() throws IOException {
+               String avroSerializerBase64 = 
"AAAAAQAAAeKs7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n"
 +
+                       
"U2VyaWFsaXplcgAAAAAAAAACAgADTAAOcHJldmlvdXNTY2hlbWF0AEBMb3JnL2FwYWNoZS9mbGluay9m\n"
 +
+                       
"b3JtYXRzL2F2cm8vdHlwZXV0aWxzL1NlcmlhbGl6YWJsZUF2cm9TY2hlbWE7TAAGc2NoZW1hcQB+AAFM\n"
 +
+                       
"AAR0eXBldAARTGphdmEvbGFuZy9DbGFzczt4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5\n"
 +
+                       
"cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA+b3JnLmFwYWNoZS5mbGluay5mb3Jt\n"
 +
+                       
"YXRzLmF2cm8udHlwZXV0aWxzLlNlcmlhbGl6YWJsZUF2cm9TY2hlbWEAAAAAAAAAAQMAAHhwdwEAeHNx\n"
 +
+                       
"AH4ABXcBAHh2cgAvb3JnLmFwYWNoZS5mbGluay5mb3JtYXRzLmF2cm8uZ2VuZXJhdGVkLkFkZHJlc3Ps\n"
 +
+                       
"9qP4qOBDagwAAHhyACtvcmcuYXBhY2hlLmF2cm8uc3BlY2lmaWMuU3BlY2lmaWNSZWNvcmRCYXNlAqL5\n"
 +
+                       "rMa3NB0MAAB4cA==";
+
+               AvroSerializer<?> avroSerializer = (AvroSerializer<?>) 
javaDeserialize(avroSerializerBase64);
+               assertSame(avroSerializer.getType(), Address.class);
+               assertThat(avroSerializer.getAvroSchema(), is(Address.SCHEMA$));
+       }
+
+       @Test
+       public void javaDeserializeFromFlink_1_7_afterInitialization() throws 
IOException {
+               String avroSerializerBase64 = 
"AAAAAQAAAeKs7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n"
 +
+                       
"U2VyaWFsaXplcgAAAAAAAAACAgADTAAOcHJldmlvdXNTY2hlbWF0AEBMb3JnL2FwYWNoZS9mbGluay9m\n"
 +
+                       
"b3JtYXRzL2F2cm8vdHlwZXV0aWxzL1NlcmlhbGl6YWJsZUF2cm9TY2hlbWE7TAAGc2NoZW1hcQB+AAFM\n"
 +
+                       
"AAR0eXBldAARTGphdmEvbGFuZy9DbGFzczt4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5\n"
 +
+                       
"cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA+b3JnLmFwYWNoZS5mbGluay5mb3Jt\n"
 +
+                       
"YXRzLmF2cm8udHlwZXV0aWxzLlNlcmlhbGl6YWJsZUF2cm9TY2hlbWEAAAAAAAAAAQMAAHhwdwEAeHNx\n"
 +
+                       
"AH4ABXcBAHh2cgAvb3JnLmFwYWNoZS5mbGluay5mb3JtYXRzLmF2cm8uZ2VuZXJhdGVkLkFkZHJlc3Ps\n"
 +
+                       
"9qP4qOBDagwAAHhyACtvcmcuYXBhY2hlLmF2cm8uc3BlY2lmaWMuU3BlY2lmaWNSZWNvcmRCYXNlAqL5\n"
 +
+                       "rMa3NB0MAAB4cA==";
+
+               AvroSerializer<?> avroSerializer = (AvroSerializer<?>) 
javaDeserialize(avroSerializerBase64);
+               assertSame(avroSerializer.getType(), Address.class);
+               assertThat(avroSerializer.getAvroSchema(), is(Address.SCHEMA$));
+       }
+
+       @Test
+       public void compositeSerializerFromFlink_1_6_WithNestedAvroSerializer() 
throws IOException {
+               String streamElementSerializerBase64 = 
"AAAAAQAAAq2s7QAFc3IAR29yZy5hcGFjaGUuZmxpbmsuc3RyZWFtaW5nLnJ1bnRpbWUuc3RyZWFtcmVj\n"
 +
+                       
"b3JkLlN0cmVhbUVsZW1lbnRTZXJpYWxpemVyAAAAAAAAAAECAAFMAA50eXBlU2VyaWFsaXplcnQANkxv\n"
 +
+                       
"cmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hyADRvcmcu\n"
 +
+                       
"YXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLlR5cGVTZXJpYWxpemVyAAAAAAAAAAECAAB4\n"
 +
+                       
"cHNyADZvcmcuYXBhY2hlLmZsaW5rLmZvcm1hdHMuYXZyby50eXBldXRpbHMuQXZyb1NlcmlhbGl6ZXIA\n"
 +
+                       
"AAAAAAAAAQIAAkwADHNjaGVtYVN0cmluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO0wABHR5cGV0ABFMamF2\n"
 +
+                       
"YS9sYW5nL0NsYXNzO3hxAH4AAnQBAXsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJBZGRyZXNzIiwibmFt\n"
 +
+                       
"ZXNwYWNlIjoib3JnLmFwYWNoZS5mbGluay5mb3JtYXRzLmF2cm8uZ2VuZXJhdGVkIiwiZmllbGRzIjpb\n"
 +
+                       
"eyJuYW1lIjoibnVtIiwidHlwZSI6ImludCJ9LHsibmFtZSI6InN0cmVldCIsInR5cGUiOiJzdHJpbmci\n"
 +
+                       
"fSx7Im5hbWUiOiJjaXR5IiwidHlwZSI6InN0cmluZyJ9LHsibmFtZSI6InN0YXRlIiwidHlwZSI6InN0\n"
 +
+                       
"cmluZyJ9LHsibmFtZSI6InppcCIsInR5cGUiOiJzdHJpbmcifV19dnIAJW9yZy5hcGFjaGUuYXZyby5n\n"
 +
+                       "ZW5lcmljLkdlbmVyaWNSZWNvcmQAAAAAAAAAAAAAAHhw";
+
+               StreamElementSerializer<?> ser = (StreamElementSerializer<?>) 
javaDeserialize(streamElementSerializerBase64);
+               TypeSerializer<?> containedTypeSerializer = 
ser.getContainedTypeSerializer();
+
+               assertThat(containedTypeSerializer, 
instanceOf(AvroSerializer.class));
+
+               AvroSerializer<?> avroSerializer = (AvroSerializer<?>) 
containedTypeSerializer;
+               assertSame(avroSerializer.getType(), GenericRecord.class);
+               assertThat(avroSerializer.getAvroSchema(), is(Address.SCHEMA$));
+       }
+
+       @Test
+       public void makeSureThatFieldsWereNotChanged() {
+               // This test should be removed once we completely migrate all 
the composite serializers.
+
+               List<String> serializedFieldNames = 
Arrays.stream(AvroSerializer.class.getDeclaredFields())
+                       .filter(field -> 
!Modifier.isTransient(field.getModifiers()))
+                       .filter(field -> 
!Modifier.isStatic(field.getModifiers()))
+                       .map(Field::getName)
+                       .sorted()
+                       .collect(Collectors.toList());
+
+               assertThat(serializedFieldNames, is(asList("previousSchema", 
"schema", "type")));
+       }
+
+       @SuppressWarnings("deprecation")
+       private static TypeSerializer<?> javaDeserialize(String base64) throws 
IOException {
+               byte[] bytes = Base64.getMimeDecoder().decode(base64);
+               DataInputDeserializer in = new DataInputDeserializer(bytes);
+               return TypeSerializerSerializationUtil.tryReadSerializer(in, 
Thread.currentThread().getContextClassLoader());
+       }
+
+       /**
+        * A simple pojo used in these tests.
+        */
+       public static class SimplePojo {
+               private String foo;
+
+               @SuppressWarnings("unused")
+               public String getFoo() {
+                       return foo;
+               }
+
+               @SuppressWarnings("unused")
+               public void setFoo(String foo) {
+                       this.foo = foo;
+               }
+       }
 }

Reply via email to