[FLINK-6869] [core] Tolerate serialVersionUID mismatches for Scala and 
anonymous serializers

This commit lets the TypeSerializerSerializationProxy be tolerable for
serialVersionUID mismatches when reading anonymous classed serializers
or our Scala serializers.

Our Scala serializers require this since they use Scala macros to be
generated at compile time, and therefore is not possible to fix a
certain serialVersionUID for them. For non-generated Scala serializers,
we still also need this because their serialVersionUIDs pre-1.3 may
vary depending on the Scala version used.

This can be seen as a workaround, and should be reverted once 1.2
savepoint compatibility is no longer maintained.

This commit also updates the streaming state docs to educate the user to
avoid using anonymous classes for their state serializers.

This closes #4090.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b216a4a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b216a4a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b216a4a0

Branch: refs/heads/master
Commit: b216a4a0acf4e4d0463c3ed961d6a0258223491a
Parents: 75ea808
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Sat Jun 10 22:41:35 2017 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue Jun 13 06:37:01 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/state.md                        |   7 +
 .../TypeSerializerSerializationUtil.java        |  82 +++++++++-
 .../TypeSerializerSerializationUtilTest.java    | 158 ++++++++++++++++++-
 ...ckendStateMetaInfoSnapshotReaderWriters.java |  14 +-
 4 files changed, 254 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b216a4a0/docs/dev/stream/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
index 97f0c29..0025fae 100644
--- a/docs/dev/stream/state.md
+++ b/docs/dev/stream/state.md
@@ -453,6 +453,13 @@ ListStateDescriptor<Tuple2<String, Integer>> descriptor =
 checkpointedState = getRuntimeContext().getListState(descriptor);
 {% endhighlight %}
 
+Note that Flink writes state serializers along with the state as metadata. In 
certain cases on restore (see following
+subsections), the written serializer needs to be deserialized and used. 
Therefore, it is recommended to avoid using
+anonymous classes as your state serializers. Anonymous classes do not have a 
guarantee on the generated classname,
+varying across compilers and depends on the order that they are instantiated 
within the enclosing class, which can 
+easily cause the previously written serializer to be unreadable (since the 
original class can no longer be found in the
+classpath).
+
 ### Handling serializer upgrades and compatibility
 
 Flink allows changing the serializers used to read and write managed state, so 
that users are not locked in to any

http://git-wip-us.apache.org/repos/asf/flink/blob/b216a4a0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
index 3d79d9a..058ef46 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
@@ -32,10 +32,16 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InvalidClassException;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Utility methods for serialization of {@link TypeSerializer} and {@link 
TypeSerializerConfigSnapshot}.
@@ -46,6 +52,67 @@ public class TypeSerializerSerializationUtil {
        private static final Logger LOG = 
LoggerFactory.getLogger(TypeSerializerSerializationUtil.class);
 
        /**
+        * This is maintained as a temporary workaround for FLINK-6869.
+        *
+        * <p>Before 1.3, the Scala serializers did not specify the 
serialVersionUID.
+        * Although since 1.3 they are properly specified, we still have to 
ignore them for now
+        * as their previous serialVersionUIDs will vary depending on the Scala 
version.
+        *
+        * <p>This can be removed once 1.2 is no longer supported.
+        */
+       private static Set<String> scalaSerializerClassnames = new HashSet<>();
+       static {
+               
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer");
+               
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer");
+               
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
+               
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");
+               
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");
+               
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
+               
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
+               
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
+       }
+
+       /**
+        * An {@link ObjectInputStream} that ignores serialVersionUID 
mismatches when deserializing objects of
+        * anonymous classes or our Scala serializer classes.
+        *
+        * <p>The {@link TypeSerializerSerializationProxy} uses this specific 
object input stream to read serializers,
+        * so that mismatching serialVersionUIDs of anonymous classes / Scala 
serializers are ignored.
+        * This is a required workaround to maintain backwards compatibility 
for our pre-1.3 Scala serializers.
+        * See FLINK-6869 for details.
+        *
+        * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-6869";>FLINK-6869</a>
+        */
+       public static class SerialUIDMismatchTolerantInputStream extends 
InstantiationUtil.ClassLoaderObjectInputStream {
+
+               public SerialUIDMismatchTolerantInputStream(InputStream in, 
ClassLoader cl) throws IOException {
+                       super(in, cl);
+               }
+
+               @Override
+               protected ObjectStreamClass readClassDescriptor() throws 
IOException, ClassNotFoundException {
+                       ObjectStreamClass streamClassDescriptor = 
super.readClassDescriptor();
+
+                       Class localClass = resolveClass(streamClassDescriptor);
+                       if 
(scalaSerializerClassnames.contains(localClass.getName()) || 
localClass.isAnonymousClass()
+                               // isAnonymousClass does not work for anonymous 
Scala classes; additionally check by classname
+                               || localClass.getName().contains("$anon$") || 
localClass.getName().contains("$anonfun")) {
+
+                               ObjectStreamClass localClassDescriptor = 
ObjectStreamClass.lookup(localClass);
+                               if (localClassDescriptor != null
+                                       && 
localClassDescriptor.getSerialVersionUID() != 
streamClassDescriptor.getSerialVersionUID()) {
+                                       LOG.warn("Ignoring serialVersionUID 
mismatch for anonymous class {}; was {}, now {}.",
+                                               
streamClassDescriptor.getName(), streamClassDescriptor.getSerialVersionUID(), 
localClassDescriptor.getSerialVersionUID());
+
+                                       streamClassDescriptor = 
localClassDescriptor;
+                               }
+                       }
+
+                       return streamClassDescriptor;
+               }
+       }
+
+       /**
         * Writes a {@link TypeSerializer} to the provided data output view.
         *
         * <p>It is written with a format that can be later read again using
@@ -354,6 +421,7 @@ public class TypeSerializerSerializationUtil {
                        }
                }
 
+               @SuppressWarnings("unchecked")
                @Override
                public void read(DataInputView in) throws IOException {
                        super.read(in);
@@ -362,8 +430,14 @@ public class TypeSerializerSerializationUtil {
                        int serializerBytes = in.readInt();
                        byte[] buffer = new byte[serializerBytes];
                        in.readFully(buffer);
-                       try {
-                               typeSerializer = 
InstantiationUtil.deserializeObject(buffer, userClassLoader);
+
+                       ClassLoader previousClassLoader = 
Thread.currentThread().getContextClassLoader();
+                       try (
+                               SerialUIDMismatchTolerantInputStream ois =
+                                       new 
SerialUIDMismatchTolerantInputStream(new ByteArrayInputStream(buffer), 
userClassLoader)) {
+
+                               
Thread.currentThread().setContextClassLoader(userClassLoader);
+                               typeSerializer = (TypeSerializer<T>) 
ois.readObject();
                        } catch (ClassNotFoundException | InvalidClassException 
e) {
                                if (useDummyPlaceholder) {
                                        // we create a dummy so that all the 
information is not lost when we get a new checkpoint before receiving
@@ -372,8 +446,10 @@ public class TypeSerializerSerializationUtil {
                                                new 
UnloadableDummyTypeSerializer<>(buffer);
                                        LOG.warn("Could not find requested 
TypeSerializer class in classpath. Created dummy.", e);
                                } else {
-                                       throw new IOException("Missing class 
for type serializer.", e);
+                                       throw new IOException("Unloadable class 
for type serializer.", e);
                                }
+                       } finally {
+                               
Thread.currentThread().setContextClassLoader(previousClassLoader);
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b216a4a0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
index 738644b..10df619 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
@@ -29,7 +29,9 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
 import org.junit.Assert;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -39,8 +41,11 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InvalidClassException;
+import java.io.ObjectStreamClass;
+import java.io.Serializable;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
@@ -55,7 +60,10 @@ import static org.mockito.Mockito.mock;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(TypeSerializerSerializationUtil.class)
-public class TypeSerializerSerializationUtilTest {
+public class TypeSerializerSerializationUtilTest implements Serializable {
+
+       @ClassRule
+       public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
        /**
         * Verifies that reading and writing serializers work correctly.
@@ -236,6 +244,36 @@ public class TypeSerializerSerializationUtilTest {
                
Assert.assertEquals(DoubleSerializer.INSTANCE.snapshotConfiguration(), 
restored.get(1).f1);
        }
 
+       /**
+        * Verifies that serializers of anonymous classes can be deserialized, 
even if serialVersionUID changes.
+        */
+       @Test
+       public void testAnonymousSerializerClassWithChangedSerialVersionUID() 
throws Exception {
+
+               TypeSerializer anonymousClassSerializer = new 
AbstractIntSerializer() {};
+               // assert that our assumption holds
+               
Assert.assertTrue(anonymousClassSerializer.getClass().isAnonymousClass());
+
+               byte[] anonymousSerializerBytes;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       TypeSerializerSerializationUtil.writeSerializer(new 
DataOutputViewStreamWrapper(out), anonymousClassSerializer);
+                       anonymousSerializerBytes = out.toByteArray();
+               }
+
+               long newSerialVersionUID = 1234567L;
+               // assert that we're actually modifying to a different 
serialVersionUID
+               
Assert.assertNotEquals(ObjectStreamClass.lookup(anonymousClassSerializer.getClass()).getSerialVersionUID(),
 newSerialVersionUID);
+               modifySerialVersionUID(anonymousSerializerBytes, 
anonymousClassSerializer.getClass().getName(), newSerialVersionUID);
+
+               try (ByteArrayInputStream in = new 
ByteArrayInputStream(anonymousSerializerBytes)) {
+                       anonymousClassSerializer = 
TypeSerializerSerializationUtil.tryReadSerializer(new 
DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+               }
+
+               // serializer should have been deserialized despite 
serialVersionUID mismatch
+               Assert.assertNotNull(anonymousClassSerializer);
+               
Assert.assertTrue(anonymousClassSerializer.getClass().isAnonymousClass());
+       }
+
        public static class TestConfigSnapshot extends 
TypeSerializerConfigSnapshot {
 
                static final int VERSION = 1;
@@ -292,4 +330,122 @@ public class TypeSerializerSerializationUtilTest {
                        return 31 * val + msg.hashCode();
                }
        }
+
+       private static void modifySerialVersionUID(byte[] objectBytes, String 
classname, long newSerialVersionUID) throws Exception {
+               byte[] classnameBytes = classname.getBytes();
+
+               // serialVersionUID follows directly after classname in the 
object byte stream;
+               // advance serialVersionUIDPosition until end of classname in 
stream
+               int serialVersionUIDOffset;
+               boolean foundClass = false;
+               int numMatchedBytes = 0;
+               for (serialVersionUIDOffset = 0; serialVersionUIDOffset < 
objectBytes.length; serialVersionUIDOffset++) {
+                       if (objectBytes[serialVersionUIDOffset] == 
classnameBytes[numMatchedBytes]) {
+                               numMatchedBytes++;
+                               foundClass = true;
+                       } else {
+                               if (objectBytes[serialVersionUIDOffset] == 
classnameBytes[0]) {
+                                       numMatchedBytes = 1;
+                               } else {
+                                       numMatchedBytes = 0;
+                                       foundClass = false;
+                               }
+                       }
+
+                       if (numMatchedBytes == classnameBytes.length) {
+                               break;
+                       }
+               }
+
+               if (!foundClass) {
+                       throw new RuntimeException("Could not find class " + 
classname + " in object byte stream.");
+               }
+
+               byte[] newUIDBytes = ByteBuffer.allocate(Long.SIZE / 
Byte.SIZE).putLong(newSerialVersionUID).array();
+
+               // replace original serialVersionUID bytes with new 
serialVersionUID bytes
+               for (int uidIndex = 0; uidIndex < newUIDBytes.length; 
uidIndex++) {
+                       objectBytes[serialVersionUIDOffset + 1 + uidIndex] = 
newUIDBytes[uidIndex];
+               }
+       }
+
+       public static abstract class AbstractIntSerializer extends 
TypeSerializer<Integer> {
+
+               public static final long serialVersionUID = 1;
+
+               @Override
+               public Integer createInstance() {
+                       return IntSerializer.INSTANCE.createInstance();
+               }
+
+               @Override
+               public boolean isImmutableType() {
+                       return IntSerializer.INSTANCE.isImmutableType();
+               }
+
+               @Override
+               public Integer copy(Integer from) {
+                       return IntSerializer.INSTANCE.copy(from);
+               }
+
+               @Override
+               public Integer copy(Integer from, Integer reuse) {
+                       return IntSerializer.INSTANCE.copy(from, reuse);
+               }
+
+               @Override
+               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
+                       IntSerializer.INSTANCE.copy(source, target);
+               }
+
+               @Override
+               public Integer deserialize(DataInputView source) throws 
IOException {
+                       return IntSerializer.INSTANCE.deserialize(source);
+               }
+
+               @Override
+               public Integer deserialize(Integer reuse, DataInputView source) 
throws IOException {
+                       return IntSerializer.INSTANCE.deserialize(reuse, 
source);
+               }
+
+               @Override
+               public void serialize(Integer record, DataOutputView target) 
throws IOException {
+                       IntSerializer.INSTANCE.serialize(record, target);
+               }
+
+               @Override
+               public TypeSerializer<Integer> duplicate() {
+                       return IntSerializer.INSTANCE.duplicate();
+               }
+
+               @Override
+               public TypeSerializerConfigSnapshot snapshotConfiguration() {
+                       return IntSerializer.INSTANCE.snapshotConfiguration();
+               }
+
+               @Override
+               public CompatibilityResult<Integer> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+                       return 
IntSerializer.INSTANCE.ensureCompatibility(configSnapshot);
+               }
+
+               @Override
+               public int getLength() {
+                       return IntSerializer.INSTANCE.getLength();
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return IntSerializer.INSTANCE.canEqual(obj);
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       return IntSerializer.INSTANCE.equals(obj);
+               }
+
+               @Override
+               public int hashCode() {
+                       return IntSerializer.INSTANCE.hashCode();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b216a4a0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
index e52323f..dc322c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -157,6 +156,7 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                        super(userCodeClassLoader);
                }
 
+               @SuppressWarnings("unchecked")
                @Override
                public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
readStateMetaInfo(DataInputView in) throws IOException {
                        RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
stateMetaInfo =
@@ -164,12 +164,20 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
 
                        stateMetaInfo.setName(in.readUTF());
                        
stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
+
                        DataInputViewStream dis = new DataInputViewStream(in);
-                       try {
-                               TypeSerializer<S> stateSerializer = 
InstantiationUtil.deserializeObject(dis, userCodeClassLoader);
+                       ClassLoader previousClassLoader = 
Thread.currentThread().getContextClassLoader();
+                       try (
+                               
TypeSerializerSerializationUtil.SerialUIDMismatchTolerantInputStream ois =
+                                       new 
TypeSerializerSerializationUtil.SerialUIDMismatchTolerantInputStream(dis, 
userCodeClassLoader)) {
+
+                               
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
+                               TypeSerializer<S> stateSerializer = 
(TypeSerializer<S>) ois.readObject();
                                
stateMetaInfo.setPartitionStateSerializer(stateSerializer);
                        } catch (ClassNotFoundException exception) {
                                throw new IOException(exception);
+                       } finally {
+                               
Thread.currentThread().setContextClassLoader(previousClassLoader);
                        }
 
                        // old versions do not contain the partition state 
serializer's configuration snapshot

Reply via email to