This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 56e45c8397bdaf3a8dec45164021c54db5b47e7e
Author: TsReaper <[email protected]>
AuthorDate: Thu Jul 18 18:47:42 2019 +0800

    [FLINK-13322][table-runtime-blink] Fix serializer snapshot recovery in 
BaseArray and BaseMap serializers
---
 .../flink/table/types/InternalSerializers.java     |   4 +-
 .../flink/table/typeutils/BaseArraySerializer.java |  26 +++-
 .../flink/table/typeutils/BaseMapSerializer.java   |  59 +++++++--
 .../apache/flink/table/dataformat/BaseRowTest.java |   3 +-
 .../flink/table/dataformat/BinaryArrayTest.java    |   4 +-
 .../table/typeutils/BaseArraySerializerTest.java   |  57 ++++++++
 .../table/typeutils/BaseMapSerializerTest.java     |  66 +++++++++-
 .../flink/table/typeutils/SerializerTestUtil.java  | 143 +++++++++++++++++++++
 8 files changed, 337 insertions(+), 25 deletions(-)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
index e6eaf81..c8d5b76 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
@@ -80,9 +80,9 @@ public class InternalSerializers {
                                return new BaseArraySerializer(((ArrayType) 
type).getElementType(), config);
                        case MAP:
                                MapType mapType = (MapType) type;
-                               return new 
BaseMapSerializer(mapType.getKeyType(), mapType.getValueType());
+                               return new 
BaseMapSerializer(mapType.getKeyType(), mapType.getValueType(), config);
                        case MULTISET:
-                               return new BaseMapSerializer(((MultisetType) 
type).getElementType(), new IntType());
+                               return new BaseMapSerializer(((MultisetType) 
type).getElementType(), new IntType(), config);
                        case ROW:
                                RowType rowType = (RowType) type;
                                return new BaseRowSerializer(config, rowType);
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java
index d8bf95e..c749dfa 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.table.typeutils;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
@@ -216,9 +217,14 @@ public class BaseArraySerializer extends 
TypeSerializer<BaseArray> {
                return eleType.hashCode();
        }
 
+       @VisibleForTesting
+       public TypeSerializer getEleSer() {
+               return eleSer;
+       }
+
        @Override
        public TypeSerializerSnapshot<BaseArray> snapshotConfiguration() {
-               return new BaseArraySerializerSnapshot(eleType);
+               return new BaseArraySerializerSnapshot(eleType, eleSer);
        }
 
        /**
@@ -228,14 +234,16 @@ public class BaseArraySerializer extends 
TypeSerializer<BaseArray> {
                private static final int CURRENT_VERSION = 3;
 
                private LogicalType previousType;
+               private TypeSerializer previousEleSer;
 
                @SuppressWarnings("unused")
                public BaseArraySerializerSnapshot() {
                        // this constructor is used when restoring from a 
checkpoint/savepoint.
                }
 
-               BaseArraySerializerSnapshot(LogicalType eleType) {
+               BaseArraySerializerSnapshot(LogicalType eleType, TypeSerializer 
eleSer) {
                        this.previousType = eleType;
+                       this.previousEleSer = eleSer;
                }
 
                @Override
@@ -245,14 +253,17 @@ public class BaseArraySerializer extends 
TypeSerializer<BaseArray> {
 
                @Override
                public void writeSnapshot(DataOutputView out) throws 
IOException {
-                       InstantiationUtil.serializeObject(new 
DataOutputViewStream(out), previousType);
+                       DataOutputViewStream outStream = new 
DataOutputViewStream(out);
+                       InstantiationUtil.serializeObject(outStream, 
previousType);
+                       InstantiationUtil.serializeObject(outStream, 
previousEleSer);
                }
 
                @Override
                public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLoader) throws IOException {
                        try {
-                               this.previousType = 
InstantiationUtil.deserializeObject(
-                                               new DataInputViewStream(in), 
userCodeClassLoader);
+                               DataInputViewStream inStream = new 
DataInputViewStream(in);
+                               this.previousType = 
InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
+                               this.previousEleSer = 
InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
                        } catch (ClassNotFoundException e) {
                                throw new IOException(e);
                        }
@@ -260,7 +271,7 @@ public class BaseArraySerializer extends 
TypeSerializer<BaseArray> {
 
                @Override
                public TypeSerializer<BaseArray> restoreSerializer() {
-                       return new BaseArraySerializer(previousType, new 
ExecutionConfig());
+                       return new BaseArraySerializer(previousType, 
previousEleSer);
                }
 
                @Override
@@ -270,7 +281,8 @@ public class BaseArraySerializer extends 
TypeSerializer<BaseArray> {
                        }
 
                        BaseArraySerializer newBaseArraySerializer = 
(BaseArraySerializer) newSerializer;
-                       if 
(!previousType.equals(newBaseArraySerializer.eleType)) {
+                       if 
(!previousType.equals(newBaseArraySerializer.eleType) ||
+                               
!previousEleSer.equals(newBaseArraySerializer.eleSer)) {
                                return 
TypeSerializerSchemaCompatibility.incompatible();
                        } else {
                                return 
TypeSerializerSchemaCompatibility.compatibleAsIs();
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
index 01dba56..766c3b5 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.table.typeutils;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
@@ -62,12 +63,21 @@ public class BaseMapSerializer extends 
TypeSerializer<BaseMap> {
        private transient BinaryArrayWriter reuseKeyWriter;
        private transient BinaryArrayWriter reuseValueWriter;
 
-       public BaseMapSerializer(LogicalType keyType, LogicalType valueType) {
+       public BaseMapSerializer(LogicalType keyType, LogicalType valueType, 
ExecutionConfig conf) {
                this.keyType = keyType;
                this.valueType = valueType;
 
-               this.keySerializer = InternalSerializers.create(keyType, new 
ExecutionConfig());
-               this.valueSerializer = InternalSerializers.create(valueType, 
new ExecutionConfig());
+               this.keySerializer = InternalSerializers.create(keyType, conf);
+               this.valueSerializer = InternalSerializers.create(valueType, 
conf);
+       }
+
+       private BaseMapSerializer(
+                       LogicalType keyType, LogicalType valueType, 
TypeSerializer keySerializer, TypeSerializer valueSerializer) {
+               this.keyType = keyType;
+               this.valueType = valueType;
+
+               this.keySerializer = keySerializer;
+               this.valueSerializer = valueSerializer;
        }
 
        @Override
@@ -77,7 +87,7 @@ public class BaseMapSerializer extends 
TypeSerializer<BaseMap> {
 
        @Override
        public TypeSerializer<BaseMap> duplicate() {
-               return new BaseMapSerializer(keyType, valueType);
+               return new BaseMapSerializer(keyType, valueType, 
keySerializer.duplicate(), valueSerializer.duplicate());
        }
 
        @Override
@@ -217,9 +227,19 @@ public class BaseMapSerializer extends 
TypeSerializer<BaseMap> {
                return result;
        }
 
+       @VisibleForTesting
+       public TypeSerializer getKeySerializer() {
+               return keySerializer;
+       }
+
+       @VisibleForTesting
+       public TypeSerializer getValueSerializer() {
+               return valueSerializer;
+       }
+
        @Override
        public TypeSerializerSnapshot<BaseMap> snapshotConfiguration() {
-               return new BaseMapSerializerSnapshot(keyType, valueType);
+               return new BaseMapSerializerSnapshot(keyType, valueType, 
keySerializer, valueSerializer);
        }
 
        /**
@@ -231,14 +251,20 @@ public class BaseMapSerializer extends 
TypeSerializer<BaseMap> {
                private LogicalType previousKeyType;
                private LogicalType previousValueType;
 
+               private TypeSerializer previousKeySerializer;
+               private TypeSerializer previousValueSerializer;
+
                @SuppressWarnings("unused")
                public BaseMapSerializerSnapshot() {
                        // this constructor is used when restoring from a 
checkpoint/savepoint.
                }
 
-               BaseMapSerializerSnapshot(LogicalType keyT, LogicalType valueT) 
{
+               BaseMapSerializerSnapshot(LogicalType keyT, LogicalType valueT, 
TypeSerializer keySer, TypeSerializer valueSer) {
                        this.previousKeyType = keyT;
                        this.previousValueType = valueT;
+
+                       this.previousKeySerializer = keySer;
+                       this.previousValueSerializer = valueSer;
                }
 
                @Override
@@ -248,15 +274,21 @@ public class BaseMapSerializer extends 
TypeSerializer<BaseMap> {
 
                @Override
                public void writeSnapshot(DataOutputView out) throws 
IOException {
-                       InstantiationUtil.serializeObject(new 
DataOutputViewStream(out), previousKeyType);
-                       InstantiationUtil.serializeObject(new 
DataOutputViewStream(out), previousValueType);
+                       DataOutputViewStream outStream = new 
DataOutputViewStream(out);
+                       InstantiationUtil.serializeObject(outStream, 
previousKeyType);
+                       InstantiationUtil.serializeObject(outStream, 
previousValueType);
+                       InstantiationUtil.serializeObject(outStream, 
previousKeySerializer);
+                       InstantiationUtil.serializeObject(outStream, 
previousValueSerializer);
                }
 
                @Override
                public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLoader) throws IOException {
                        try {
-                               this.previousKeyType = 
InstantiationUtil.deserializeObject(new DataInputViewStream(in), 
userCodeClassLoader);
-                               this.previousValueType = 
InstantiationUtil.deserializeObject(new DataInputViewStream(in), 
userCodeClassLoader);
+                               DataInputViewStream inStream = new 
DataInputViewStream(in);
+                               this.previousKeyType = 
InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
+                               this.previousValueType = 
InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
+                               this.previousKeySerializer = 
InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
+                               this.previousValueSerializer = 
InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
                        } catch (ClassNotFoundException e) {
                                throw new IOException(e);
                        }
@@ -264,7 +296,8 @@ public class BaseMapSerializer extends 
TypeSerializer<BaseMap> {
 
                @Override
                public TypeSerializer<BaseMap> restoreSerializer() {
-                       return new BaseMapSerializer(previousKeyType, 
previousValueType);
+                       return new BaseMapSerializer(
+                               previousKeyType, previousValueType, 
previousKeySerializer, previousValueSerializer);
                }
 
                @Override
@@ -275,7 +308,9 @@ public class BaseMapSerializer extends 
TypeSerializer<BaseMap> {
 
                        BaseMapSerializer newBaseMapSerializer = 
(BaseMapSerializer) newSerializer;
                        if 
(!previousKeyType.equals(newBaseMapSerializer.keyType) ||
-                                       
!previousValueType.equals(newBaseMapSerializer.valueType)) {
+                               
!previousValueType.equals(newBaseMapSerializer.valueType) ||
+                               
!previousKeySerializer.equals(newBaseMapSerializer.keySerializer) ||
+                               
!previousValueSerializer.equals(newBaseMapSerializer.valueSerializer)) {
                                return 
TypeSerializerSchemaCompatibility.incompatible();
                        } else {
                                return 
TypeSerializerSchemaCompatibility.compatibleAsIs();
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
index 43bc6a6..8723405 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
@@ -102,7 +102,8 @@ public class BaseRowTest {
                writer.writeDecimal(10, decimal1, 5);
                writer.writeDecimal(11, decimal2, 20);
                writer.writeArray(12, array, new 
BaseArraySerializer(DataTypes.INT().getLogicalType(), null));
-               writer.writeMap(13, map, new 
BaseMapSerializer(DataTypes.INT().getLogicalType(), 
DataTypes.INT().getLogicalType()));
+               writer.writeMap(13, map, new BaseMapSerializer(
+                       DataTypes.INT().getLogicalType(), 
DataTypes.INT().getLogicalType(), null));
                writer.writeRow(14, underRow, new BaseRowSerializer(null, 
RowType.of(new IntType(), new IntType())));
                writer.writeBinary(15, bytes);
                return row;
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
index 54d0802..5063646 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
@@ -325,7 +325,7 @@ public class BinaryArrayTest {
                        BinaryArrayWriter writer = new BinaryArrayWriter(array, 
2, 8);
                        writer.setNullAt(0);
                        writer.writeMap(1, BinaryMap.valueOf(subArray, 
subArray),
-                                       new 
BaseMapSerializer(DataTypes.INT().getLogicalType(), 
DataTypes.INT().getLogicalType()));
+                                       new 
BaseMapSerializer(DataTypes.INT().getLogicalType(), 
DataTypes.INT().getLogicalType(), null));
                        writer.complete();
 
                        assertTrue(array.isNullAt(0));
@@ -358,7 +358,7 @@ public class BinaryArrayTest {
                BinaryRow row = new BinaryRow(1);
                BinaryRowWriter rowWriter = new BinaryRowWriter(row);
                rowWriter.writeMap(0, binaryMap,
-                               new 
BaseMapSerializer(DataTypes.INT().getLogicalType(), 
DataTypes.INT().getLogicalType()));
+                               new 
BaseMapSerializer(DataTypes.INT().getLogicalType(), 
DataTypes.INT().getLogicalType(), null));
                rowWriter.complete();
 
                BinaryMap map = (BinaryMap) row.getMap(0);
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java
index feb9893..f7f0e31 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java
@@ -19,15 +19,31 @@
 package org.apache.flink.table.typeutils;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.dataformat.BaseArray;
 import org.apache.flink.table.dataformat.BinaryArray;
 import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryGeneric;
 import org.apache.flink.table.dataformat.BinaryString;
 import org.apache.flink.table.dataformat.GenericArray;
 import org.apache.flink.testutils.DeeplyEqualsChecker;
 
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import static org.apache.flink.table.typeutils.SerializerTestUtil.MyObj;
+import static 
org.apache.flink.table.typeutils.SerializerTestUtil.MyObjSerializer;
+import static 
org.apache.flink.table.typeutils.SerializerTestUtil.snapshotAndReconfigure;
+import static org.junit.Assert.assertEquals;
+
 /**
  * A test for the {@link BaseArraySerializer}.
  */
@@ -58,6 +74,47 @@ public class BaseArraySerializerTest extends 
SerializerTestBase<BaseArray> {
                ));
        }
 
+       @Test
+       public void testExecutionConfigWithKryo() throws Exception {
+               // serialize base array
+               ExecutionConfig config = new ExecutionConfig();
+               config.enableForceKryo();
+               config.registerTypeWithKryoSerializer(MyObj.class, new 
MyObjSerializer());
+               final BaseArraySerializer serializer = 
createSerializerWithConfig(config);
+
+               MyObj inputObj = new MyObj(114514, 1919810);
+               BaseArray inputArray = new GenericArray(new BinaryGeneric[] {
+                       new BinaryGeneric<>(inputObj, new 
KryoSerializer<>(MyObj.class, config))
+               }, 1);
+
+               byte[] serialized;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       serializer.serialize(inputArray, new 
DataOutputViewStreamWrapper(out));
+                       serialized = out.toByteArray();
+               }
+
+               // deserialize base array using restored serializer
+               final BaseArraySerializer restoreSerializer =
+                       (BaseArraySerializer) 
snapshotAndReconfigure(serializer, () -> createSerializerWithConfig(config));
+
+               BaseArray outputArray;
+               try (ByteArrayInputStream in = new 
ByteArrayInputStream(serialized)) {
+                       outputArray = restoreSerializer.deserialize(new 
DataInputViewStreamWrapper(in));
+               }
+
+               TypeSerializer restoreEleSer = restoreSerializer.getEleSer();
+               assertEquals(serializer.getEleSer(), restoreEleSer);
+
+               MyObj outputObj = BinaryGeneric.getJavaObjectFromBinaryGeneric(
+                       outputArray.getGeneric(0), new 
KryoSerializer<>(MyObj.class, config));
+               assertEquals(inputObj, outputObj);
+       }
+
+       private BaseArraySerializer createSerializerWithConfig(ExecutionConfig 
config) {
+               return new BaseArraySerializer(
+                       
DataTypes.ANY(TypeInformation.of(MyObj.class)).getLogicalType(), config);
+       }
+
        @Override
        protected BaseArraySerializer createSerializer() {
                return new 
BaseArraySerializer(DataTypes.STRING().getLogicalType(), new ExecutionConfig());
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java
index 38216dc..6c7710b 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java
@@ -18,20 +18,36 @@
 
 package org.apache.flink.table.typeutils;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.dataformat.BaseMap;
 import org.apache.flink.table.dataformat.BinaryArray;
 import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryGeneric;
 import org.apache.flink.table.dataformat.BinaryMap;
 import org.apache.flink.table.dataformat.BinaryString;
 import org.apache.flink.table.dataformat.GenericMap;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.testutils.DeeplyEqualsChecker;
 
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.table.typeutils.SerializerTestUtil.MyObj;
+import static 
org.apache.flink.table.typeutils.SerializerTestUtil.MyObjSerializer;
+import static 
org.apache.flink.table.typeutils.SerializerTestUtil.snapshotAndReconfigure;
+import static org.junit.Assert.assertEquals;
+
 /**
  * A test for the {@link BaseMapSerializer}.
  */
@@ -55,8 +71,56 @@ public class BaseMapSerializerTest extends 
SerializerTestBase<BaseMap> {
                ));
        }
 
+       @Test
+       public void testExecutionConfigWithKryo() throws Exception {
+               // serialize base array
+               ExecutionConfig config = new ExecutionConfig();
+               config.enableForceKryo();
+               config.registerTypeWithKryoSerializer(MyObj.class, new 
MyObjSerializer());
+               final BaseMapSerializer serializer = 
createSerializerWithConfig(config);
+
+               int inputKey = 998244353;
+               MyObj inputObj = new MyObj(114514, 1919810);
+               Map<Object, Object> javaMap = new HashMap<>();
+               javaMap.put(inputKey, new BinaryGeneric<>(inputObj, new 
KryoSerializer<>(MyObj.class, config)));
+               BaseMap inputMap = new GenericMap(javaMap);
+
+               byte[] serialized;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       serializer.serialize(inputMap, new 
DataOutputViewStreamWrapper(out));
+                       serialized = out.toByteArray();
+               }
+
+               // deserialize base array using restored serializer
+               final BaseMapSerializer restoreSerializer =
+                       (BaseMapSerializer) snapshotAndReconfigure(serializer, 
() -> createSerializerWithConfig(config));
+
+               BaseMap outputMap;
+               try (ByteArrayInputStream in = new 
ByteArrayInputStream(serialized)) {
+                       outputMap = restoreSerializer.deserialize(new 
DataInputViewStreamWrapper(in));
+               }
+
+               TypeSerializer restoreKeySer = 
restoreSerializer.getKeySerializer();
+               TypeSerializer restoreValueSer = 
restoreSerializer.getValueSerializer();
+               assertEquals(serializer.getKeySerializer(), restoreKeySer);
+               assertEquals(serializer.getValueSerializer(), restoreValueSer);
+
+               MyObj outputObj = BinaryGeneric.getJavaObjectFromBinaryGeneric(
+                       (BinaryGeneric) outputMap.toJavaMap(
+                               DataTypes.INT().getLogicalType(), 
DataTypes.ANY(TypeInformation.of(MyObj.class)).getLogicalType())
+                               .get(inputKey), new 
KryoSerializer<>(MyObj.class, config));
+               assertEquals(inputObj, outputObj);
+       }
+
+       private BaseMapSerializer createSerializerWithConfig(ExecutionConfig 
config) {
+               return new BaseMapSerializer(
+                       DataTypes.INT().getLogicalType(),
+                       
DataTypes.ANY(TypeInformation.of(MyObj.class)).getLogicalType(),
+                       config);
+       }
+
        private static BaseMapSerializer newSer() {
-               return new BaseMapSerializer(INT, STRING);
+               return new BaseMapSerializer(INT, STRING, new 
ExecutionConfig());
        }
 
        @Override
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/SerializerTestUtil.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/SerializerTestUtil.java
new file mode 100644
index 0000000..0587e07
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/SerializerTestUtil.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Utils for testing serializers.
+ */
+public class SerializerTestUtil {
+
+       /**
+        * Snapshot and restore the given serializer. Returns the restored 
serializer.
+        */
+       public static <T> TypeSerializer<T> snapshotAndReconfigure(
+               TypeSerializer<T> serializer, SerializerGetter<T> 
serializerGetter) throws IOException {
+               TypeSerializerSnapshot<T> configSnapshot = 
serializer.snapshotConfiguration();
+
+               byte[] serializedConfig;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+                               new DataOutputViewStreamWrapper(out), 
configSnapshot, serializer);
+                       serializedConfig = out.toByteArray();
+               }
+
+               TypeSerializerSnapshot<T> restoredConfig;
+               try (ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
+                       restoredConfig = 
TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+                               new DataInputViewStreamWrapper(in),
+                               Thread.currentThread().getContextClassLoader(),
+                               serializerGetter.getSerializer());
+               }
+
+               TypeSerializerSchemaCompatibility<T> strategy =
+                       
restoredConfig.resolveSchemaCompatibility(serializerGetter.getSerializer());
+               final TypeSerializer<T> restoredSerializer;
+               if (strategy.isCompatibleAsIs()) {
+                       restoredSerializer = restoredConfig.restoreSerializer();
+               }
+               else if (strategy.isCompatibleWithReconfiguredSerializer()) {
+                       restoredSerializer = 
strategy.getReconfiguredSerializer();
+               }
+               else {
+                       throw new AssertionError("Unable to restore serializer 
with " + strategy);
+               }
+               assertEquals(serializer.getClass(), 
restoredSerializer.getClass());
+
+               return restoredSerializer;
+       }
+
+       /**
+        * Used for snapshotAndReconfigure method to provide serializers when 
restoring.
+        */
+       public interface SerializerGetter<T> {
+               TypeSerializer<T> getSerializer();
+       }
+
+       /**
+        * A simple POJO.
+        */
+       public static class MyObj {
+               private int a;
+               private int b;
+
+               MyObj(int a, int b) {
+                       this.a = a;
+                       this.b = b;
+               }
+
+               int getA() {
+                       return a;
+               }
+
+               int getB() {
+                       return b;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       return o instanceof MyObj && ((MyObj) o).a == a && 
((MyObj) o).b == b;
+               }
+       }
+
+       /**
+        * Kryo serializer for POJO.
+        */
+       public static class MyObjSerializer extends Serializer<MyObj> 
implements Serializable {
+
+               private static final long serialVersionUID = 1L;
+               private static final int delta = 7;
+
+               @Override
+               public void write(Kryo kryo, Output output, MyObj myObj) {
+                       output.writeInt(myObj.getA() + delta);
+                       output.writeInt(myObj.getB() + delta);
+               }
+
+               @Override
+               public MyObj read(Kryo kryo, Input input, Class<MyObj> aClass) {
+                       int a = input.readInt() - delta;
+                       int b = input.readInt() - delta;
+                       return new MyObj(a, b);
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       return o instanceof MyObjSerializer;
+               }
+       }
+}

Reply via email to