[FLINK-7420] [avro] Replace GenericData.Array by dummy when reading 
TypeSerializers

This also adds a new test that verifies that we correctly register
Avro Serializers when they are present and modifies an existing test to
verify that we correctly register dummy classes.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/29249b2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/29249b2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/29249b2e

Branch: refs/heads/master
Commit: 29249b2eeb9cb9910a5a55ae6c3a0b648d67d2b5
Parents: db7c70f
Author: Aljoscha Krettek <[email protected]>
Authored: Wed Oct 25 17:38:24 2017 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Nov 3 16:40:34 2017 +0100

----------------------------------------------------------------------
 .../flink-connector-kafka-0.10/pom.xml          |   8 ++
 .../flink-connector-kafka-0.11/pom.xml          |   8 ++
 .../flink-connector-kafka-0.8/pom.xml           |   8 ++
 .../flink-connector-kafka-0.9/pom.xml           |   8 ++
 .../TypeSerializerSerializationUtil.java        |  23 +++-
 ...ryoRegistrationSerializerConfigSnapshot.java |   2 +-
 .../kryo/KryoSerializerCompatibilityTest.java   | 125 +++++++++++++++++++
 .../type-with-avro-serialized-using-kryo        |   1 +
 .../type-without-avro-serialized-using-kryo     | Bin 0 -> 31 bytes
 .../AvroKryoSerializerRegistrationsTest.java    | 117 +++++++++++++++++
 .../test/resources/flink_11-kryo_registrations  |  86 +++++++++++++
 flink-libraries/flink-cep/pom.xml               |   8 --
 ...ckendStateMetaInfoSnapshotReaderWriters.java |   4 +-
 .../misc/KryoSerializerRegistrationsTest.java   |  11 ++
 pom.xml                                         |  21 ++--
 15 files changed, 404 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml 
b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 2b6660d..3357591 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -95,6 +95,14 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-avro_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-connectors/flink-connector-kafka-0.11/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml 
b/flink-connectors/flink-connector-kafka-0.11/pom.xml
index 162d5d0..4f6be1d 100644
--- a/flink-connectors/flink-connector-kafka-0.11/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml
@@ -104,6 +104,14 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-avro_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml 
b/flink-connectors/flink-connector-kafka-0.8/pom.xml
index c990188..b96274a 100644
--- a/flink-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml
@@ -83,6 +83,14 @@ under the License.
                </dependency>
 
                <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-avro_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka_${scala.binary.version}</artifactId>
                        <version>${kafka.version}</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/pom.xml 
b/flink-connectors/flink-connector-kafka-0.9/pom.xml
index 819d590..c711c5f 100644
--- a/flink-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.9/pom.xml
@@ -91,6 +91,14 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-avro_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
index 058ef46..d03498a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
@@ -74,7 +75,9 @@ public class TypeSerializerSerializationUtil {
 
        /**
         * An {@link ObjectInputStream} that ignores serialVersionUID 
mismatches when deserializing objects of
-        * anonymous classes or our Scala serializer classes.
+        * anonymous classes or our Scala serializer classes and also replaces 
occurences of GenericData.Array
+        * (from Avro) by a dummy class so that the KryoSerializer can still be 
deserialized without
+        * Avro being on the classpath.
         *
         * <p>The {@link TypeSerializerSerializationProxy} uses this specific 
object input stream to read serializers,
         * so that mismatching serialVersionUIDs of anonymous classes / Scala 
serializers are ignored.
@@ -83,9 +86,9 @@ public class TypeSerializerSerializationUtil {
         *
         * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-6869";>FLINK-6869</a>
         */
-       public static class SerialUIDMismatchTolerantInputStream extends 
InstantiationUtil.ClassLoaderObjectInputStream {
+       public static class FailureTolerantObjectInputStream extends 
InstantiationUtil.ClassLoaderObjectInputStream {
 
-               public SerialUIDMismatchTolerantInputStream(InputStream in, 
ClassLoader cl) throws IOException {
+               public FailureTolerantObjectInputStream(InputStream in, 
ClassLoader cl) throws IOException {
                        super(in, cl);
                }
 
@@ -93,6 +96,16 @@ public class TypeSerializerSerializationUtil {
                protected ObjectStreamClass readClassDescriptor() throws 
IOException, ClassNotFoundException {
                        ObjectStreamClass streamClassDescriptor = 
super.readClassDescriptor();
 
+                       try {
+                               Class.forName(streamClassDescriptor.getName(), 
false, classLoader);
+                       } catch (ClassNotFoundException e) {
+                               if 
(streamClassDescriptor.getName().equals("org.apache.avro.generic.GenericData$Array"))
 {
+                                       ObjectStreamClass result = 
ObjectStreamClass.lookup(
+                                               
KryoRegistrationSerializerConfigSnapshot.DummyRegisteredClass.class);
+                                       return result;
+                               }
+                       }
+
                        Class localClass = resolveClass(streamClassDescriptor);
                        if 
(scalaSerializerClassnames.contains(localClass.getName()) || 
localClass.isAnonymousClass()
                                // isAnonymousClass does not work for anonymous 
Scala classes; additionally check by classname
@@ -433,8 +446,8 @@ public class TypeSerializerSerializationUtil {
 
                        ClassLoader previousClassLoader = 
Thread.currentThread().getContextClassLoader();
                        try (
-                               SerialUIDMismatchTolerantInputStream ois =
-                                       new 
SerialUIDMismatchTolerantInputStream(new ByteArrayInputStream(buffer), 
userClassLoader)) {
+                               FailureTolerantObjectInputStream ois =
+                                       new 
FailureTolerantObjectInputStream(new ByteArrayInputStream(buffer), 
userClassLoader)) {
 
                                
Thread.currentThread().setContextClassLoader(userClassLoader);
                                typeSerializer = (TypeSerializer<T>) 
ois.readObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
index 14287ca..cdf6b23 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
@@ -217,7 +217,7 @@ public abstract class 
KryoRegistrationSerializerConfigSnapshot<T> extends Generi
        /**
         * Placeholder dummy for a previously registered class that can no 
longer be found in classpath on restore.
         */
-       public static class DummyRegisteredClass {}
+       public static class DummyRegisteredClass implements Serializable {}
 
        /**
         * Placeholder dummmy for a previously registered Kryo serializer that 
is no longer valid or in classpath on restore.

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
index 1cacc9e..11c95f1 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -29,14 +29,20 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
 import java.io.InputStream;
+import java.util.List;
 
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -44,6 +50,9 @@ import static org.junit.Assert.assertTrue;
  */
 public class KryoSerializerCompatibilityTest {
 
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
        @Test
        public void testMigrationStrategyForRemovedAvroDependency() throws 
Exception {
                KryoSerializer<TestClass> kryoSerializerForA = new 
KryoSerializer<>(TestClass.class, new ExecutionConfig());
@@ -85,6 +94,122 @@ public class KryoSerializerCompatibilityTest {
                assertTrue(compatResult.isRequiresMigration());
        }
 
+       @Test
+       public void testMigrationOfTypeWithAvroType() throws Exception {
+
+               /*
+                When Avro sees the schema "{"type" : "array", "items" : 
"boolean"}" it will create a field
+                of type List<Integer> but the actual type will be 
GenericData.Array<Integer>. The
+                KryoSerializer registers a special Serializer for this type 
that simply deserializes
+                as ArrayList because Kryo cannot handle GenericData.Array 
well. Before Flink 1.4 Avro
+                was always in the classpath but after 1.4 it's only present if 
the flink-avro jar is
+                included. This test verifies that we can still deserialize 
data written pre-1.4.
+                */
+               class FakeAvroClass {
+                       public List<Integer> array;
+
+                       FakeAvroClass(List<Integer> array) {
+                               this.array = array;
+                       }
+               }
+
+               /*
+               // This has to be executed on a pre-1.4 branch to generate the 
binary blob
+               {
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       KryoSerializer<FakeAvroClass> kryoSerializer =
+                               new KryoSerializer<>(FakeAvroClass.class, 
executionConfig);
+
+                       try (
+                               FileOutputStream f = new FileOutputStream(
+                                       
"src/test/resources/type-with-avro-serialized-using-kryo");
+                               DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(f)) {
+
+
+                               GenericData.Array<Integer> array =
+                                       new GenericData.Array<>(10, 
Schema.createArray(Schema.create(Schema.Type.INT)));
+
+                               array.add(10);
+                               array.add(20);
+                               array.add(30);
+
+                               FakeAvroClass myTestClass = new 
FakeAvroClass(array);
+
+                               kryoSerializer.serialize(myTestClass, 
outputView);
+                       }
+               }
+               */
+
+               {
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       KryoSerializer<FakeAvroClass> kryoSerializer =
+                               new KryoSerializer<>(FakeAvroClass.class, 
executionConfig);
+
+                       try (
+                               FileInputStream f = new 
FileInputStream("src/test/resources/type-with-avro-serialized-using-kryo");
+                               DataInputViewStreamWrapper inputView = new 
DataInputViewStreamWrapper(f)) {
+
+                               thrown.expectMessage("Could not find required 
Avro dependency");
+                               FakeAvroClass myTestClass = 
kryoSerializer.deserialize(inputView);
+                       }
+               }
+       }
+
+       @Test
+       public void testMigrationWithTypeDevoidOfAvroTypes() throws Exception {
+
+               class FakeClass {
+                       public List<Integer> array;
+
+                       FakeClass(List<Integer> array) {
+                               this.array = array;
+                       }
+               }
+
+               /*
+               // This has to be executed on a pre-1.4 branch to generate the 
binary blob
+               {
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       KryoSerializer<FakeClass> kryoSerializer =
+                               new KryoSerializer<>(FakeClass.class, 
executionConfig);
+
+                       try (
+                               FileOutputStream f = new FileOutputStream(
+                                       
"src/test/resources/type-without-avro-serialized-using-kryo");
+                               DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(f)) {
+
+
+                               List<Integer> array = new ArrayList<>(10);
+
+                               array.add(10);
+                               array.add(20);
+                               array.add(30);
+
+                               FakeClass myTestClass = new FakeClass(array);
+
+                               kryoSerializer.serialize(myTestClass, 
outputView);
+                       }
+               }
+               */
+
+               {
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       KryoSerializer<FakeClass> kryoSerializer =
+                               new KryoSerializer<>(FakeClass.class, 
executionConfig);
+
+                       try (
+                               FileInputStream f = new 
FileInputStream("src/test/resources/type-without-avro-serialized-using-kryo");
+                               DataInputViewStreamWrapper inputView = new 
DataInputViewStreamWrapper(f)) {
+
+                               FakeClass myTestClass = 
kryoSerializer.deserialize(inputView);
+
+                               assertThat(myTestClass.array.get(0), is(10));
+                               assertThat(myTestClass.array.get(1), is(20));
+                               assertThat(myTestClass.array.get(2), is(30));
+                       }
+               }
+       }
+
        /**
         * Tests that after reconfiguration, registration ids are reconfigured 
to
         * remain the same as the preceding KryoSerializer.

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/test/resources/type-with-avro-serialized-using-kryo
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/type-with-avro-serialized-using-kryo 
b/flink-core/src/test/resources/type-with-avro-serialized-using-kryo
new file mode 100644
index 0000000..3901024
--- /dev/null
+++ b/flink-core/src/test/resources/type-with-avro-serialized-using-kryo
@@ -0,0 +1 @@
+
(<
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/test/resources/type-without-avro-serialized-using-kryo
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/resources/type-without-avro-serialized-using-kryo 
b/flink-core/src/test/resources/type-without-avro-serialized-using-kryo
new file mode 100644
index 0000000..d95094c
Binary files /dev/null and 
b/flink-core/src/test/resources/type-without-avro-serialized-using-kryo differ

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
new file mode 100644
index 0000000..060cfdd
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Registration;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that the set of Kryo registrations is the same across compatible
+ * Flink versions.
+ *
+ * <p>Special version of {@code KryoSerializerRegistrationsTest} that sits in 
the Avro module
+ * and verifies that we correctly register Avro types at the {@link 
KryoSerializer} when
+ * Avro is present.
+ */
+public class AvroKryoSerializerRegistrationsTest {
+
+       /**
+        * Tests that the registered classes in Kryo did not change.
+        *
+        * <p>Once we have proper serializer versioning this test will become 
obsolete.
+        * But currently a change in the serializers can break savepoint 
backwards
+        * compatibility between Flink versions.
+        */
+       @Test
+       public void testDefaultKryoRegisteredClassesDidNotChange() throws 
Exception {
+               final Kryo kryo = new KryoSerializer<>(Integer.class, new 
ExecutionConfig()).getKryo();
+
+               try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(
+                               
getClass().getClassLoader().getResourceAsStream("flink_11-kryo_registrations"))))
 {
+
+                       String line;
+                       while ((line = reader.readLine()) != null) {
+                               String[] split = line.split(",");
+                               final int tag = Integer.parseInt(split[0]);
+                               final String registeredClass = split[1];
+
+                               Registration registration = 
kryo.getRegistration(tag);
+
+                               if (registration == null) {
+                                       fail(String.format("Registration for %d 
= %s got lost", tag, registeredClass));
+                               }
+                               else if 
(!registeredClass.equals(registration.getType().getName())) {
+                                       fail(String.format("Registration for %d 
= %s changed to %s",
+                                                       tag, registeredClass, 
registration.getType().getName()));
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Creates a Kryo serializer and writes the default registrations out 
to a
+        * comma separated file with one entry per line:
+        *
+        * <pre>
+        * id,class
+        * </pre>
+        *
+        * <p>The produced file is used to check that the registered IDs don't 
change
+        * in future Flink versions.
+        *
+        * <p>This method is not used in the tests, but documents how the test 
file
+        * has been created and can be used to re-create it if needed.
+        *
+        * @param filePath File path to write registrations to
+        */
+       private void writeDefaultKryoRegistrations(String filePath) throws 
IOException {
+               final File file = new File(filePath);
+               if (file.exists()) {
+                       assertTrue(file.delete());
+               }
+
+               final Kryo kryo = new KryoSerializer<>(Integer.class, new 
ExecutionConfig()).getKryo();
+               final int nextId = kryo.getNextRegistrationId();
+
+               try (BufferedWriter writer = new BufferedWriter(new 
FileWriter(file))) {
+                       for (int i = 0; i < nextId; i++) {
+                               Registration registration = 
kryo.getRegistration(i);
+                               String str = registration.getId() + "," + 
registration.getType().getName();
+                               writer.write(str, 0, str.length());
+                               writer.newLine();
+                       }
+
+                       System.out.println("Created file with registrations at 
" + file.getAbsolutePath());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations 
b/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations
new file mode 100644
index 0000000..7000e62
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations
@@ -0,0 +1,86 @@
+0,int
+1,java.lang.String
+2,float
+3,boolean
+4,byte
+5,char
+6,short
+7,long
+8,double
+9,void
+10,scala.collection.convert.Wrappers$SeqWrapper
+11,scala.collection.convert.Wrappers$IteratorWrapper
+12,scala.collection.convert.Wrappers$MapWrapper
+13,scala.collection.convert.Wrappers$JListWrapper
+14,scala.collection.convert.Wrappers$JMapWrapper
+15,scala.Some
+16,scala.util.Left
+17,scala.util.Right
+18,scala.collection.immutable.Vector
+19,scala.collection.immutable.Set$Set1
+20,scala.collection.immutable.Set$Set2
+21,scala.collection.immutable.Set$Set3
+22,scala.collection.immutable.Set$Set4
+23,scala.collection.immutable.HashSet$HashTrieSet
+24,scala.collection.immutable.Map$Map1
+25,scala.collection.immutable.Map$Map2
+26,scala.collection.immutable.Map$Map3
+27,scala.collection.immutable.Map$Map4
+28,scala.collection.immutable.HashMap$HashTrieMap
+29,scala.collection.immutable.Range$Inclusive
+30,scala.collection.immutable.NumericRange$Inclusive
+31,scala.collection.immutable.NumericRange$Exclusive
+32,scala.collection.mutable.BitSet
+33,scala.collection.mutable.HashMap
+34,scala.collection.mutable.HashSet
+35,scala.collection.convert.Wrappers$IterableWrapper
+36,scala.Tuple1
+37,scala.Tuple2
+38,scala.Tuple3
+39,scala.Tuple4
+40,scala.Tuple5
+41,scala.Tuple6
+42,scala.Tuple7
+43,scala.Tuple8
+44,scala.Tuple9
+45,scala.Tuple10
+46,scala.Tuple11
+47,scala.Tuple12
+48,scala.Tuple13
+49,scala.Tuple14
+50,scala.Tuple15
+51,scala.Tuple16
+52,scala.Tuple17
+53,scala.Tuple18
+54,scala.Tuple19
+55,scala.Tuple20
+56,scala.Tuple21
+57,scala.Tuple22
+58,scala.Tuple1$mcJ$sp
+59,scala.Tuple1$mcI$sp
+60,scala.Tuple1$mcD$sp
+61,scala.Tuple2$mcJJ$sp
+62,scala.Tuple2$mcJI$sp
+63,scala.Tuple2$mcJD$sp
+64,scala.Tuple2$mcIJ$sp
+65,scala.Tuple2$mcII$sp
+66,scala.Tuple2$mcID$sp
+67,scala.Tuple2$mcDJ$sp
+68,scala.Tuple2$mcDI$sp
+69,scala.Tuple2$mcDD$sp
+70,scala.Symbol
+71,scala.reflect.ClassTag
+72,scala.runtime.BoxedUnit
+73,java.util.Arrays$ArrayList
+74,java.util.BitSet
+75,java.util.PriorityQueue
+76,java.util.regex.Pattern
+77,java.sql.Date
+78,java.sql.Time
+79,java.sql.Timestamp
+80,java.net.URI
+81,java.net.InetSocketAddress
+82,java.util.UUID
+83,java.util.Locale
+84,java.text.SimpleDateFormat
+85,org.apache.avro.generic.GenericData$Array

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-libraries/flink-cep/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/pom.xml 
b/flink-libraries/flink-cep/pom.xml
index bd57d17..a561cca 100644
--- a/flink-libraries/flink-cep/pom.xml
+++ b/flink-libraries/flink-cep/pom.xml
@@ -88,14 +88,6 @@ under the License.
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
-
-        <!-- we include Avro to make the CEPMigrationTest work, it uses a 
Kryo-serialized savepoint (see FLINK-7420) -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-avro_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
     
     <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
index dc322c3..c333397 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
@@ -168,8 +168,8 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                        DataInputViewStream dis = new DataInputViewStream(in);
                        ClassLoader previousClassLoader = 
Thread.currentThread().getContextClassLoader();
                        try (
-                               
TypeSerializerSerializationUtil.SerialUIDMismatchTolerantInputStream ois =
-                                       new 
TypeSerializerSerializationUtil.SerialUIDMismatchTolerantInputStream(dis, 
userCodeClassLoader)) {
+                               
TypeSerializerSerializationUtil.FailureTolerantObjectInputStream ois =
+                                       new 
TypeSerializerSerializationUtil.FailureTolerantObjectInputStream(dis, 
userCodeClassLoader)) {
 
                                
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
                                TypeSerializer<S> stateSerializer = 
(TypeSerializer<S>) ois.readObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
index 77d2a1a..cbe9394 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
@@ -33,6 +33,8 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -72,6 +74,15 @@ public class KryoSerializerRegistrationsTest {
                                if (registration == null) {
                                        fail(String.format("Registration for %d 
= %s got lost", tag, registeredClass));
                                }
+                               else if 
(registeredClass.equals("org.apache.avro.generic.GenericData$Array")) {
+                                       // starting with Flink 1.4 Avro is no 
longer a dependency of core. Avro is
+                                       // only available if flink-avro is 
present. There is a special version of
+                                       // this test in 
AvroKryoSerializerRegistrationsTest that verifies correct
+                                       // registration of Avro types if present
+                                       assertThat(
+                                               
registration.getType().getName(),
+                                               
is("org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass"));
+                               }
                                else if 
(!registeredClass.equals(registration.getType().getName())) {
                                        fail(String.format("Registration for %d 
= %s changed to %s",
                                                        tag, registeredClass, 
registration.getType().getName()));

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 773dc34..b93251b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -215,11 +215,11 @@ under the License.
        </dependencies>
 
        <!-- this section defines the module versions that are used if nothing 
else is specified. -->
-       
+
        <dependencyManagement>
-               <!-- WARN: 
-                       DO NOT put      guava, 
-                                               protobuf, 
+               <!-- WARN:
+                       DO NOT put      guava,
+                                               protobuf,
                                                asm,
                                                netty
                                        here. It will overwrite Hadoop's guava 
dependency (even though we handle it
@@ -367,7 +367,7 @@ under the License.
                                <artifactId>joda-convert</artifactId>
                                <version>1.7</version>
                        </dependency>
-                       
+
                        <!-- kryo used in different versions by Flink an chill 
-->
                        <dependency>
                                <groupId>com.esotericsoftware.kryo</groupId>
@@ -579,7 +579,7 @@ under the License.
                                                                        
<outputDir>${project.build.directory}/spotbugs</outputDir>
                                                                        <!-- A 
list of available stylesheets can be found here: 
https://github.com/findbugsproject/findbugs/tree/master/findbugs/src/xsl -->
                                                                        
<stylesheet>plain.xsl</stylesheet>
-                                                                       
+
                                                                        
<fileMappers>
                                                                                
<fileMapper
                                                                                
        
implementation="org.codehaus.plexus.components.io.filemappers.FileExtensionMapper">
@@ -772,7 +772,7 @@ under the License.
                                </plugins>
                        </build>
                </profile>
-               
+
                <profile>
                        <!--japicmp 0.7 does not support deactivation from the 
command
                                line, so we have to use a workaround with 
profiles instead.
@@ -842,7 +842,7 @@ under the License.
                                </dependency>
                        </dependencies>
                </profile>
-               
+
                <profile>
                        <id>release</id>
                        <properties>
@@ -1027,6 +1027,7 @@ under the License.
 
                                                <!-- Test Data. -->
                                                
<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
+                        
<exclude>flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations</exclude>
                                                
<exclude>flink-runtime/src/test/resources/flink_11-kryo_registrations</exclude>
                                                
<exclude>flink-core/src/test/resources/kryo-serializer-config-snapshot-v1</exclude>
                                                
<exclude>flink-formats/flink-avro/src/test/resources/avro/*.avsc</exclude>
@@ -1287,7 +1288,7 @@ under the License.
                        </plugin>
                </plugins>
 
-               <!-- Plugin configurations for plugins activated in 
sub-projects --> 
+               <!-- Plugin configurations for plugins activated in 
sub-projects -->
 
                <pluginManagement>
                        <plugins>
@@ -1310,7 +1311,7 @@ under the License.
                                        
<artifactId>maven-shade-plugin</artifactId>
                                        <version>2.4.1</version>
                                </plugin>
-                               
+
                                <!-- Disable certain plugins in Eclipse -->
                                <plugin>
                                        <groupId>org.eclipse.m2e</groupId>

Reply via email to