This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 56e45c8397bdaf3a8dec45164021c54db5b47e7e Author: TsReaper <[email protected]> AuthorDate: Thu Jul 18 18:47:42 2019 +0800 [FLINK-13322][table-runtime-blink] Fix serializer snapshot recovery in BaseArray and BaseMap serializers --- .../flink/table/types/InternalSerializers.java | 4 +- .../flink/table/typeutils/BaseArraySerializer.java | 26 +++- .../flink/table/typeutils/BaseMapSerializer.java | 59 +++++++-- .../apache/flink/table/dataformat/BaseRowTest.java | 3 +- .../flink/table/dataformat/BinaryArrayTest.java | 4 +- .../table/typeutils/BaseArraySerializerTest.java | 57 ++++++++ .../table/typeutils/BaseMapSerializerTest.java | 66 +++++++++- .../flink/table/typeutils/SerializerTestUtil.java | 143 +++++++++++++++++++++ 8 files changed, 337 insertions(+), 25 deletions(-) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java index e6eaf81..c8d5b76 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java @@ -80,9 +80,9 @@ public class InternalSerializers { return new BaseArraySerializer(((ArrayType) type).getElementType(), config); case MAP: MapType mapType = (MapType) type; - return new BaseMapSerializer(mapType.getKeyType(), mapType.getValueType()); + return new BaseMapSerializer(mapType.getKeyType(), mapType.getValueType(), config); case MULTISET: - return new BaseMapSerializer(((MultisetType) type).getElementType(), new IntType()); + return new BaseMapSerializer(((MultisetType) type).getElementType(), new IntType(), config); case ROW: RowType rowType = (RowType) type; return new BaseRowSerializer(config, rowType); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java index d8bf95e..c749dfa 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java @@ -17,6 +17,7 @@ package org.apache.flink.table.typeutils; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; @@ -216,9 +217,14 @@ public class BaseArraySerializer extends TypeSerializer<BaseArray> { return eleType.hashCode(); } + @VisibleForTesting + public TypeSerializer getEleSer() { + return eleSer; + } + @Override public TypeSerializerSnapshot<BaseArray> snapshotConfiguration() { - return new BaseArraySerializerSnapshot(eleType); + return new BaseArraySerializerSnapshot(eleType, eleSer); } /** @@ -228,14 +234,16 @@ public class BaseArraySerializer extends TypeSerializer<BaseArray> { private static final int CURRENT_VERSION = 3; private LogicalType previousType; + private TypeSerializer previousEleSer; @SuppressWarnings("unused") public BaseArraySerializerSnapshot() { // this constructor is used when restoring from a checkpoint/savepoint. } - BaseArraySerializerSnapshot(LogicalType eleType) { + BaseArraySerializerSnapshot(LogicalType eleType, TypeSerializer eleSer) { this.previousType = eleType; + this.previousEleSer = eleSer; } @Override @@ -245,14 +253,17 @@ public class BaseArraySerializer extends TypeSerializer<BaseArray> { @Override public void writeSnapshot(DataOutputView out) throws IOException { - InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousType); + DataOutputViewStream outStream = new DataOutputViewStream(out); + InstantiationUtil.serializeObject(outStream, previousType); + InstantiationUtil.serializeObject(outStream, previousEleSer); } @Override public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { try { - this.previousType = InstantiationUtil.deserializeObject( - new DataInputViewStream(in), userCodeClassLoader); + DataInputViewStream inStream = new DataInputViewStream(in); + this.previousType = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader); + this.previousEleSer = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader); } catch (ClassNotFoundException e) { throw new IOException(e); } @@ -260,7 +271,7 @@ public class BaseArraySerializer extends TypeSerializer<BaseArray> { @Override public TypeSerializer<BaseArray> restoreSerializer() { - return new BaseArraySerializer(previousType, new ExecutionConfig()); + return new BaseArraySerializer(previousType, previousEleSer); } @Override @@ -270,7 +281,8 @@ public class BaseArraySerializer extends TypeSerializer<BaseArray> { } BaseArraySerializer newBaseArraySerializer = (BaseArraySerializer) newSerializer; - if (!previousType.equals(newBaseArraySerializer.eleType)) { + if (!previousType.equals(newBaseArraySerializer.eleType) || + !previousEleSer.equals(newBaseArraySerializer.eleSer)) { return TypeSerializerSchemaCompatibility.incompatible(); } else { return TypeSerializerSchemaCompatibility.compatibleAsIs(); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java index 01dba56..766c3b5 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java @@ -17,6 +17,7 @@ package org.apache.flink.table.typeutils; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; @@ -62,12 +63,21 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> { private transient BinaryArrayWriter reuseKeyWriter; private transient BinaryArrayWriter reuseValueWriter; - public BaseMapSerializer(LogicalType keyType, LogicalType valueType) { + public BaseMapSerializer(LogicalType keyType, LogicalType valueType, ExecutionConfig conf) { this.keyType = keyType; this.valueType = valueType; - this.keySerializer = InternalSerializers.create(keyType, new ExecutionConfig()); - this.valueSerializer = InternalSerializers.create(valueType, new ExecutionConfig()); + this.keySerializer = InternalSerializers.create(keyType, conf); + this.valueSerializer = InternalSerializers.create(valueType, conf); + } + + private BaseMapSerializer( + LogicalType keyType, LogicalType valueType, TypeSerializer keySerializer, TypeSerializer valueSerializer) { + this.keyType = keyType; + this.valueType = valueType; + + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; } @Override @@ -77,7 +87,7 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> { @Override public TypeSerializer<BaseMap> duplicate() { - return new BaseMapSerializer(keyType, valueType); + return new BaseMapSerializer(keyType, valueType, keySerializer.duplicate(), valueSerializer.duplicate()); } @Override @@ -217,9 +227,19 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> { return result; } + @VisibleForTesting + public TypeSerializer getKeySerializer() { + return keySerializer; + } + + @VisibleForTesting + public TypeSerializer getValueSerializer() { + return valueSerializer; + } + @Override public TypeSerializerSnapshot<BaseMap> snapshotConfiguration() { - return new BaseMapSerializerSnapshot(keyType, valueType); + return new BaseMapSerializerSnapshot(keyType, valueType, keySerializer, valueSerializer); } /** @@ -231,14 +251,20 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> { private LogicalType previousKeyType; private LogicalType previousValueType; + private TypeSerializer previousKeySerializer; + private TypeSerializer previousValueSerializer; + @SuppressWarnings("unused") public BaseMapSerializerSnapshot() { // this constructor is used when restoring from a checkpoint/savepoint. } - BaseMapSerializerSnapshot(LogicalType keyT, LogicalType valueT) { + BaseMapSerializerSnapshot(LogicalType keyT, LogicalType valueT, TypeSerializer keySer, TypeSerializer valueSer) { this.previousKeyType = keyT; this.previousValueType = valueT; + + this.previousKeySerializer = keySer; + this.previousValueSerializer = valueSer; } @Override @@ -248,15 +274,21 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> { @Override public void writeSnapshot(DataOutputView out) throws IOException { - InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousKeyType); - InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousValueType); + DataOutputViewStream outStream = new DataOutputViewStream(out); + InstantiationUtil.serializeObject(outStream, previousKeyType); + InstantiationUtil.serializeObject(outStream, previousValueType); + InstantiationUtil.serializeObject(outStream, previousKeySerializer); + InstantiationUtil.serializeObject(outStream, previousValueSerializer); } @Override public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { try { - this.previousKeyType = InstantiationUtil.deserializeObject(new DataInputViewStream(in), userCodeClassLoader); - this.previousValueType = InstantiationUtil.deserializeObject(new DataInputViewStream(in), userCodeClassLoader); + DataInputViewStream inStream = new DataInputViewStream(in); + this.previousKeyType = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader); + this.previousValueType = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader); + this.previousKeySerializer = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader); + this.previousValueSerializer = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader); } catch (ClassNotFoundException e) { throw new IOException(e); } @@ -264,7 +296,8 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> { @Override public TypeSerializer<BaseMap> restoreSerializer() { - return new BaseMapSerializer(previousKeyType, previousValueType); + return new BaseMapSerializer( + previousKeyType, previousValueType, previousKeySerializer, previousValueSerializer); } @Override @@ -275,7 +308,9 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> { BaseMapSerializer newBaseMapSerializer = (BaseMapSerializer) newSerializer; if (!previousKeyType.equals(newBaseMapSerializer.keyType) || - !previousValueType.equals(newBaseMapSerializer.valueType)) { + !previousValueType.equals(newBaseMapSerializer.valueType) || + !previousKeySerializer.equals(newBaseMapSerializer.keySerializer) || + !previousValueSerializer.equals(newBaseMapSerializer.valueSerializer)) { return TypeSerializerSchemaCompatibility.incompatible(); } else { return TypeSerializerSchemaCompatibility.compatibleAsIs(); diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java index 43bc6a6..8723405 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java @@ -102,7 +102,8 @@ public class BaseRowTest { writer.writeDecimal(10, decimal1, 5); writer.writeDecimal(11, decimal2, 20); writer.writeArray(12, array, new BaseArraySerializer(DataTypes.INT().getLogicalType(), null)); - writer.writeMap(13, map, new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType())); + writer.writeMap(13, map, new BaseMapSerializer( + DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType(), null)); writer.writeRow(14, underRow, new BaseRowSerializer(null, RowType.of(new IntType(), new IntType()))); writer.writeBinary(15, bytes); return row; diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java index 54d0802..5063646 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java @@ -325,7 +325,7 @@ public class BinaryArrayTest { BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); writer.setNullAt(0); writer.writeMap(1, BinaryMap.valueOf(subArray, subArray), - new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType())); + new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType(), null)); writer.complete(); assertTrue(array.isNullAt(0)); @@ -358,7 +358,7 @@ public class BinaryArrayTest { BinaryRow row = new BinaryRow(1); BinaryRowWriter rowWriter = new BinaryRowWriter(row); rowWriter.writeMap(0, binaryMap, - new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType())); + new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType(), null)); rowWriter.complete(); BinaryMap map = (BinaryMap) row.getMap(0); diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java index feb9893..f7f0e31 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java @@ -19,15 +19,31 @@ package org.apache.flink.table.typeutils; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.dataformat.BaseArray; import org.apache.flink.table.dataformat.BinaryArray; import org.apache.flink.table.dataformat.BinaryArrayWriter; +import org.apache.flink.table.dataformat.BinaryGeneric; import org.apache.flink.table.dataformat.BinaryString; import org.apache.flink.table.dataformat.GenericArray; import org.apache.flink.testutils.DeeplyEqualsChecker; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +import static org.apache.flink.table.typeutils.SerializerTestUtil.MyObj; +import static org.apache.flink.table.typeutils.SerializerTestUtil.MyObjSerializer; +import static org.apache.flink.table.typeutils.SerializerTestUtil.snapshotAndReconfigure; +import static org.junit.Assert.assertEquals; + /** * A test for the {@link BaseArraySerializer}. */ @@ -58,6 +74,47 @@ public class BaseArraySerializerTest extends SerializerTestBase<BaseArray> { )); } + @Test + public void testExecutionConfigWithKryo() throws Exception { + // serialize base array + ExecutionConfig config = new ExecutionConfig(); + config.enableForceKryo(); + config.registerTypeWithKryoSerializer(MyObj.class, new MyObjSerializer()); + final BaseArraySerializer serializer = createSerializerWithConfig(config); + + MyObj inputObj = new MyObj(114514, 1919810); + BaseArray inputArray = new GenericArray(new BinaryGeneric[] { + new BinaryGeneric<>(inputObj, new KryoSerializer<>(MyObj.class, config)) + }, 1); + + byte[] serialized; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + serializer.serialize(inputArray, new DataOutputViewStreamWrapper(out)); + serialized = out.toByteArray(); + } + + // deserialize base array using restored serializer + final BaseArraySerializer restoreSerializer = + (BaseArraySerializer) snapshotAndReconfigure(serializer, () -> createSerializerWithConfig(config)); + + BaseArray outputArray; + try (ByteArrayInputStream in = new ByteArrayInputStream(serialized)) { + outputArray = restoreSerializer.deserialize(new DataInputViewStreamWrapper(in)); + } + + TypeSerializer restoreEleSer = restoreSerializer.getEleSer(); + assertEquals(serializer.getEleSer(), restoreEleSer); + + MyObj outputObj = BinaryGeneric.getJavaObjectFromBinaryGeneric( + outputArray.getGeneric(0), new KryoSerializer<>(MyObj.class, config)); + assertEquals(inputObj, outputObj); + } + + private BaseArraySerializer createSerializerWithConfig(ExecutionConfig config) { + return new BaseArraySerializer( + DataTypes.ANY(TypeInformation.of(MyObj.class)).getLogicalType(), config); + } + @Override protected BaseArraySerializer createSerializer() { return new BaseArraySerializer(DataTypes.STRING().getLogicalType(), new ExecutionConfig()); diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java index 38216dc..6c7710b 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java @@ -18,20 +18,36 @@ package org.apache.flink.table.typeutils; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.dataformat.BaseMap; import org.apache.flink.table.dataformat.BinaryArray; import org.apache.flink.table.dataformat.BinaryArrayWriter; +import org.apache.flink.table.dataformat.BinaryGeneric; import org.apache.flink.table.dataformat.BinaryMap; import org.apache.flink.table.dataformat.BinaryString; import org.apache.flink.table.dataformat.GenericMap; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.testutils.DeeplyEqualsChecker; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.util.HashMap; import java.util.Map; +import static org.apache.flink.table.typeutils.SerializerTestUtil.MyObj; +import static org.apache.flink.table.typeutils.SerializerTestUtil.MyObjSerializer; +import static org.apache.flink.table.typeutils.SerializerTestUtil.snapshotAndReconfigure; +import static org.junit.Assert.assertEquals; + /** * A test for the {@link BaseMapSerializer}. */ @@ -55,8 +71,56 @@ public class BaseMapSerializerTest extends SerializerTestBase<BaseMap> { )); } + @Test + public void testExecutionConfigWithKryo() throws Exception { + // serialize base array + ExecutionConfig config = new ExecutionConfig(); + config.enableForceKryo(); + config.registerTypeWithKryoSerializer(MyObj.class, new MyObjSerializer()); + final BaseMapSerializer serializer = createSerializerWithConfig(config); + + int inputKey = 998244353; + MyObj inputObj = new MyObj(114514, 1919810); + Map<Object, Object> javaMap = new HashMap<>(); + javaMap.put(inputKey, new BinaryGeneric<>(inputObj, new KryoSerializer<>(MyObj.class, config))); + BaseMap inputMap = new GenericMap(javaMap); + + byte[] serialized; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + serializer.serialize(inputMap, new DataOutputViewStreamWrapper(out)); + serialized = out.toByteArray(); + } + + // deserialize base array using restored serializer + final BaseMapSerializer restoreSerializer = + (BaseMapSerializer) snapshotAndReconfigure(serializer, () -> createSerializerWithConfig(config)); + + BaseMap outputMap; + try (ByteArrayInputStream in = new ByteArrayInputStream(serialized)) { + outputMap = restoreSerializer.deserialize(new DataInputViewStreamWrapper(in)); + } + + TypeSerializer restoreKeySer = restoreSerializer.getKeySerializer(); + TypeSerializer restoreValueSer = restoreSerializer.getValueSerializer(); + assertEquals(serializer.getKeySerializer(), restoreKeySer); + assertEquals(serializer.getValueSerializer(), restoreValueSer); + + MyObj outputObj = BinaryGeneric.getJavaObjectFromBinaryGeneric( + (BinaryGeneric) outputMap.toJavaMap( + DataTypes.INT().getLogicalType(), DataTypes.ANY(TypeInformation.of(MyObj.class)).getLogicalType()) + .get(inputKey), new KryoSerializer<>(MyObj.class, config)); + assertEquals(inputObj, outputObj); + } + + private BaseMapSerializer createSerializerWithConfig(ExecutionConfig config) { + return new BaseMapSerializer( + DataTypes.INT().getLogicalType(), + DataTypes.ANY(TypeInformation.of(MyObj.class)).getLogicalType(), + config); + } + private static BaseMapSerializer newSer() { - return new BaseMapSerializer(INT, STRING); + return new BaseMapSerializer(INT, STRING, new ExecutionConfig()); } @Override diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/SerializerTestUtil.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/SerializerTestUtil.java new file mode 100644 index 0000000..0587e07 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/SerializerTestUtil.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.typeutils; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; + +import static org.junit.Assert.assertEquals; + +/** + * Utils for testing serializers. + */ +public class SerializerTestUtil { + + /** + * Snapshot and restore the given serializer. Returns the restored serializer. + */ + public static <T> TypeSerializer<T> snapshotAndReconfigure( + TypeSerializer<T> serializer, SerializerGetter<T> serializerGetter) throws IOException { + TypeSerializerSnapshot<T> configSnapshot = serializer.snapshotConfiguration(); + + byte[] serializedConfig; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot( + new DataOutputViewStreamWrapper(out), configSnapshot, serializer); + serializedConfig = out.toByteArray(); + } + + TypeSerializerSnapshot<T> restoredConfig; + try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { + restoredConfig = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot( + new DataInputViewStreamWrapper(in), + Thread.currentThread().getContextClassLoader(), + serializerGetter.getSerializer()); + } + + TypeSerializerSchemaCompatibility<T> strategy = + restoredConfig.resolveSchemaCompatibility(serializerGetter.getSerializer()); + final TypeSerializer<T> restoredSerializer; + if (strategy.isCompatibleAsIs()) { + restoredSerializer = restoredConfig.restoreSerializer(); + } + else if (strategy.isCompatibleWithReconfiguredSerializer()) { + restoredSerializer = strategy.getReconfiguredSerializer(); + } + else { + throw new AssertionError("Unable to restore serializer with " + strategy); + } + assertEquals(serializer.getClass(), restoredSerializer.getClass()); + + return restoredSerializer; + } + + /** + * Used for snapshotAndReconfigure method to provide serializers when restoring. + */ + public interface SerializerGetter<T> { + TypeSerializer<T> getSerializer(); + } + + /** + * A simple POJO. + */ + public static class MyObj { + private int a; + private int b; + + MyObj(int a, int b) { + this.a = a; + this.b = b; + } + + int getA() { + return a; + } + + int getB() { + return b; + } + + @Override + public boolean equals(Object o) { + return o instanceof MyObj && ((MyObj) o).a == a && ((MyObj) o).b == b; + } + } + + /** + * Kryo serializer for POJO. + */ + public static class MyObjSerializer extends Serializer<MyObj> implements Serializable { + + private static final long serialVersionUID = 1L; + private static final int delta = 7; + + @Override + public void write(Kryo kryo, Output output, MyObj myObj) { + output.writeInt(myObj.getA() + delta); + output.writeInt(myObj.getB() + delta); + } + + @Override + public MyObj read(Kryo kryo, Input input, Class<MyObj> aClass) { + int a = input.readInt() - delta; + int b = input.readInt() - delta; + return new MyObj(a, b); + } + + @Override + public boolean equals(Object o) { + return o instanceof MyObjSerializer; + } + } +}
