http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java index 4712ed1..5459d53 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java @@ -18,29 +18,43 @@ package org.apache.flink.api.java.typeutils.runtime; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Objects; import java.util.Random; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.SerializerTestBase; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; import org.apache.flink.api.common.operators.Keys.ExpressionKeys; import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerUtil; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.junit.Assert; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + /** * A test for the {@link PojoSerializer}. */ @@ -191,6 +205,20 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te return true; } } + + public static class SubTestUserClassA extends TestUserClass { + public int subDumm1; + public String subDumm2; + + public SubTestUserClassA() {} + } + + public static class SubTestUserClassB extends TestUserClass { + public Double subDumm1; + public float subDumm2; + + public SubTestUserClassB() {} + } /** * This tests if the hashes returned by the pojo and tuple comparators are the same @@ -240,4 +268,244 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te Assert.assertTrue("The hashing for tuples and pojos must be the same, so that they are mixable. Also for those with multiple key fields", multiPojoHash == multiTupleHash); } -} + + // -------------------------------------------------------------------------------------------- + // Configuration snapshotting & reconfiguring tests + // -------------------------------------------------------------------------------------------- + + /** + * Verifies that reconfiguring with a config snapshot of a preceding POJO serializer + * with different POJO type will result in INCOMPATIBLE. + */ + @Test + public void testReconfigureWithDifferentPojoType() throws Exception { + PojoSerializer<SubTestUserClassB> pojoSerializer1 = (PojoSerializer<SubTestUserClassB>) + TypeExtractor.getForClass(SubTestUserClassB.class).createSerializer(new ExecutionConfig()); + + // snapshot configuration and serialize to bytes + TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer1.snapshotConfiguration(); + byte[] serializedConfig; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot); + serializedConfig = out.toByteArray(); + } + + PojoSerializer<SubTestUserClassA> pojoSerializer2 = (PojoSerializer<SubTestUserClassA>) + TypeExtractor.getForClass(SubTestUserClassA.class).createSerializer(new ExecutionConfig()); + + // read configuration again from bytes + try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { + pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot( + new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); + } + + CompatibilityResult<SubTestUserClassA> compatResult = pojoSerializer2.ensureCompatibility(pojoSerializerConfigSnapshot); + assertTrue(compatResult.requiresMigration()); + } + + /** + * Tests that reconfiguration correctly reorders subclass registrations to their previous order. + */ + @Test + public void testReconfigureDifferentSubclassRegistrationOrder() throws Exception { + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.registerPojoType(SubTestUserClassA.class); + executionConfig.registerPojoType(SubTestUserClassB.class); + + PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(executionConfig); + + // get original registration ids + int subClassATag = pojoSerializer.getRegisteredClasses().get(SubTestUserClassA.class); + int subClassBTag = pojoSerializer.getRegisteredClasses().get(SubTestUserClassB.class); + + // snapshot configuration and serialize to bytes + TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration(); + byte[] serializedConfig; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot); + serializedConfig = out.toByteArray(); + } + + // use new config and instantiate new PojoSerializer + executionConfig = new ExecutionConfig(); + executionConfig.registerPojoType(SubTestUserClassB.class); // test with B registered before A + executionConfig.registerPojoType(SubTestUserClassA.class); + + pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(executionConfig); + + // read configuration from bytes + try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { + pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot( + new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); + } + + CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot); + assertTrue(!compatResult.requiresMigration()); + + // reconfigure - check reconfiguration result and that registration ids remains the same + //assertEquals(ReconfigureResult.COMPATIBLE, pojoSerializer.reconfigure(pojoSerializerConfigSnapshot)); + assertEquals(subClassATag, pojoSerializer.getRegisteredClasses().get(SubTestUserClassA.class).intValue()); + assertEquals(subClassBTag, pojoSerializer.getRegisteredClasses().get(SubTestUserClassB.class).intValue()); + } + + /** + * Tests that reconfiguration repopulates previously cached subclass serializers. + */ + @Test + public void testReconfigureRepopulateNonregisteredSubclassSerializerCache() throws Exception { + // don't register any subclasses + PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig()); + + // create cached serializers for SubTestUserClassA and SubTestUserClassB + pojoSerializer.getSubclassSerializer(SubTestUserClassA.class); + pojoSerializer.getSubclassSerializer(SubTestUserClassB.class); + + assertEquals(2, pojoSerializer.getSubclassSerializerCache().size()); + assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class)); + assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class)); + + // snapshot configuration and serialize to bytes + TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration(); + byte[] serializedConfig; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot); + serializedConfig = out.toByteArray(); + } + + // instantiate new PojoSerializer + + pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig()); + + // read configuration from bytes + try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { + pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot( + new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); + } + + // reconfigure - check reconfiguration result and that subclass serializer cache is repopulated + CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot); + assertFalse(compatResult.requiresMigration()); + assertEquals(2, pojoSerializer.getSubclassSerializerCache().size()); + assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class)); + assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class)); + } + + /** + * Tests that: + * - Previous Pojo serializer did not have registrations, and created cached serializers for subclasses + * - On restore, it had those subclasses registered + * + * In this case, after reconfiguration, the cache should be repopulated, and registrations should + * also exist for the subclasses. + * + * Note: the cache still needs to be repopulated because previous data of those subclasses were + * written with the cached serializers. In this case, the repopulated cache has reconfigured serializers + * for the subclasses so that previous written data can be read, but the registered serializers + * for the subclasses do not necessarily need to be reconfigured since they will only be used to + * write new data. + */ + @Test + public void testReconfigureWithPreviouslyNonregisteredSubclasses() throws Exception { + // don't register any subclasses at first + PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig()); + + // create cached serializers for SubTestUserClassA and SubTestUserClassB + pojoSerializer.getSubclassSerializer(SubTestUserClassA.class); + pojoSerializer.getSubclassSerializer(SubTestUserClassB.class); + + // make sure serializers are in cache + assertEquals(2, pojoSerializer.getSubclassSerializerCache().size()); + assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class)); + assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class)); + + // make sure that registrations are empty + assertTrue(pojoSerializer.getRegisteredClasses().isEmpty()); + assertEquals(0, pojoSerializer.getRegisteredSerializers().length); + + // snapshot configuration and serialize to bytes + TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration(); + byte[] serializedConfig; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot); + serializedConfig = out.toByteArray(); + } + + // instantiate new PojoSerializer, with new execution config that has the subclass registrations + ExecutionConfig newExecutionConfig = new ExecutionConfig(); + newExecutionConfig.registerPojoType(SubTestUserClassA.class); + newExecutionConfig.registerPojoType(SubTestUserClassB.class); + pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(newExecutionConfig); + + // read configuration from bytes + try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { + pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot( + new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); + } + + // reconfigure - check reconfiguration result and that + // 1) subclass serializer cache is repopulated + // 2) registrations also contain the now registered subclasses + CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot); + assertFalse(compatResult.requiresMigration()); + assertEquals(2, pojoSerializer.getSubclassSerializerCache().size()); + assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class)); + assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class)); + assertEquals(2, pojoSerializer.getRegisteredClasses().size()); + assertTrue(pojoSerializer.getRegisteredClasses().containsKey(SubTestUserClassA.class)); + assertTrue(pojoSerializer.getRegisteredClasses().containsKey(SubTestUserClassB.class)); + } + + /** + * Verifies that reconfiguration reorders the fields of the new Pojo serializer to remain the same. + */ + @Test + public void testReconfigureWithDifferentFieldOrder() throws Exception { + Field[] mockOriginalFieldOrder = { + TestUserClass.class.getField("dumm4"), + TestUserClass.class.getField("dumm3"), + TestUserClass.class.getField("nestedClass"), + TestUserClass.class.getField("dumm1"), + TestUserClass.class.getField("dumm2"), + TestUserClass.class.getField("dumm5"), + }; + + // creating this serializer just for generating config snapshots of the field serializers + PojoSerializer<TestUserClass> ser = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig()); + + LinkedHashMap<Field, TypeSerializerConfigSnapshot> mockOriginalFieldToSerializerConfigSnapshot = + new LinkedHashMap<>(mockOriginalFieldOrder.length); + mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[0], ser.getFieldSerializers()[3].snapshotConfiguration()); + mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[1], ser.getFieldSerializers()[2].snapshotConfiguration()); + mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[2], ser.getFieldSerializers()[5].snapshotConfiguration()); + mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[3], ser.getFieldSerializers()[0].snapshotConfiguration()); + mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[4], ser.getFieldSerializers()[1].snapshotConfiguration()); + mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[5], ser.getFieldSerializers()[4].snapshotConfiguration()); + + PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig()); + + assertEquals(TestUserClass.class.getField("dumm1"), pojoSerializer.getFields()[0]); + assertEquals(TestUserClass.class.getField("dumm2"), pojoSerializer.getFields()[1]); + assertEquals(TestUserClass.class.getField("dumm3"), pojoSerializer.getFields()[2]); + assertEquals(TestUserClass.class.getField("dumm4"), pojoSerializer.getFields()[3]); + assertEquals(TestUserClass.class.getField("dumm5"), pojoSerializer.getFields()[4]); + assertEquals(TestUserClass.class.getField("nestedClass"), pojoSerializer.getFields()[5]); + + PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass> mockPreviousConfigSnapshot = + new PojoSerializer.PojoSerializerConfigSnapshot<>( + TestUserClass.class, + mockOriginalFieldToSerializerConfigSnapshot, // this mocks the previous field order + new LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot>(), // empty; irrelevant for this test + new HashMap<Class<?>, TypeSerializerConfigSnapshot>()); // empty; irrelevant for this test + + // reconfigure - check reconfiguration result and that fields are reordered to the previous order + CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility( + + mockPreviousConfigSnapshot); + assertFalse(compatResult.requiresMigration()); + int i = 0; + for (Field field : mockOriginalFieldOrder) { + assertEquals(field, pojoSerializer.getFields()[i]); + i++; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java new file mode 100644 index 0000000..60c4dc4 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime.kryo; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerUtil; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests related to configuration snapshotting and reconfiguring for the {@link KryoSerializer}. + */ +public class KryoSerializerCompatibilityTest { + + /** + * Verifies that reconfiguration result is INCOMPATIBLE if data type has changed. + */ + @Test + public void testMigrationStrategyWithDifferentKryoType() throws Exception { + KryoSerializer<TestClassA> kryoSerializerForA = new KryoSerializer<>(TestClassA.class, new ExecutionConfig()); + + // snapshot configuration and serialize to bytes + TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = kryoSerializerForA.snapshotConfiguration(); + byte[] serializedConfig; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot); + serializedConfig = out.toByteArray(); + } + + KryoSerializer<TestClassB> kryoSerializerForB = new KryoSerializer<>(TestClassB.class, new ExecutionConfig()); + + // read configuration again from bytes + try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { + kryoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot( + new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); + } + + CompatibilityResult<TestClassB> compatResult = kryoSerializerForB.ensureCompatibility(kryoSerializerConfigSnapshot); + assertTrue(compatResult.requiresMigration()); + } + + /** + * Tests that after reconfiguration, registration ids are reconfigured to + * remain the same as the preceding KryoSerializer. + */ + @Test + public void testMigrationStrategyForDifferentRegistrationOrder() throws Exception { + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.registerKryoType(TestClassA.class); + executionConfig.registerKryoType(TestClassB.class); + + KryoSerializer<TestClass> kryoSerializer = new KryoSerializer<>(TestClass.class, executionConfig); + + // get original registration ids + int testClassId = kryoSerializer.getKryo().getRegistration(TestClass.class).getId(); + int testClassAId = kryoSerializer.getKryo().getRegistration(TestClassA.class).getId(); + int testClassBId = kryoSerializer.getKryo().getRegistration(TestClassB.class).getId(); + + // snapshot configuration and serialize to bytes + TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = kryoSerializer.snapshotConfiguration(); + byte[] serializedConfig; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot); + serializedConfig = out.toByteArray(); + } + + // use new config and instantiate new KryoSerializer + executionConfig = new ExecutionConfig(); + executionConfig.registerKryoType(TestClassB.class); // test with B registered before A + executionConfig.registerKryoType(TestClassA.class); + + kryoSerializer = new KryoSerializer<>(TestClass.class, executionConfig); + + // read configuration from bytes + try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { + kryoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot( + new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); + } + + // reconfigure - check reconfiguration result and that registration id remains the same + CompatibilityResult<TestClass> compatResult = kryoSerializer.ensureCompatibility(kryoSerializerConfigSnapshot); + assertFalse(compatResult.requiresMigration()); + assertEquals(testClassId, kryoSerializer.getKryo().getRegistration(TestClass.class).getId()); + assertEquals(testClassAId, kryoSerializer.getKryo().getRegistration(TestClassA.class).getId()); + assertEquals(testClassBId, kryoSerializer.getKryo().getRegistration(TestClassB.class).getId()); + } + + private static class TestClass {} + + private static class TestClassA {} + + private static class TestClassB {} + + private static class TestClassBSerializer extends Serializer { + @Override + public void write(Kryo kryo, Output output, Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public Object read(Kryo kryo, Input input, Class aClass) { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java index 3bb40eb..4dabaca 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java @@ -26,7 +26,9 @@ import static org.junit.Assert.fail; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.memory.DataInputView; @@ -386,5 +388,15 @@ public class CollectionInputFormatTest { public int hashCode() { return Objects.hash(failOnRead, failOnWrite); } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + throw new UnsupportedOperationException(); + } + + @Override + public CompatibilityResult<ElementType> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + throw new UnsupportedOperationException(); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java index 846b6c3..9e22fc2 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java @@ -18,7 +18,10 @@ package org.apache.flink.cep; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -39,7 +42,8 @@ import java.util.IdentityHashMap; * * @param <T> Type of the element to be serialized */ -public class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> { +@Internal +public final class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> { private static final long serialVersionUID = -7633631762221447524L; // underlying type serializer @@ -192,4 +196,14 @@ public class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> { this.identityMap = new IdentityHashMap<>(); this.elementList = new ArrayList<>(); } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + throw new UnsupportedOperationException("This serializer is not registered for managed state."); + } + + @Override + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + throw new UnsupportedOperationException("This serializer is not registered for managed state."); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index b8c4e65..70755e5 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -21,6 +21,22 @@ package org.apache.flink.cep.nfa; import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.LinkedHashMultimap; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; +import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; +import org.apache.flink.cep.NonDuplicatingTypeSerializer; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -43,20 +59,6 @@ import java.util.Set; import java.util.Stack; import java.util.regex.Matcher; import java.util.regex.Pattern; -import javax.annotation.Nullable; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; -import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; -import org.apache.flink.cep.NonDuplicatingTypeSerializer; -import org.apache.flink.cep.nfa.compiler.NFACompiler; -import org.apache.flink.cep.pattern.conditions.IterativeCondition; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.util.Preconditions; /** * Non-deterministic finite automaton implementation. @@ -859,7 +861,7 @@ public class NFA<T> implements Serializable { /** * A {@link TypeSerializer} for {@link NFA} that uses Java Serialization. */ - public static class Serializer<T> extends TypeSerializer<NFA<T>> { + public static class Serializer<T> extends TypeSerializerSingleton<NFA<T>> { private static final long serialVersionUID = 1L; @@ -869,11 +871,6 @@ public class NFA<T> implements Serializable { } @Override - public TypeSerializer<NFA<T>> duplicate() { - return this; - } - - @Override public NFA<T> createInstance() { return null; } @@ -944,18 +941,8 @@ public class NFA<T> implements Serializable { } @Override - public boolean equals(Object obj) { - return obj instanceof Serializer && ((Serializer) obj).canEqual(this); - } - - @Override public boolean canEqual(Object obj) { return obj instanceof Serializer; } - - @Override - public int hashCode() { - return getClass().hashCode(); - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index b6374cd..14235dc 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -21,7 +21,10 @@ package org.apache.flink.cep.operator; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; @@ -473,11 +476,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> } @Override - public boolean canRestoreFrom(TypeSerializer<?> other) { - return equals(other) || other instanceof AbstractKeyedCEPPatternOperator.PriorityQueueSerializer; - } - - @Override public boolean equals(Object obj) { if (obj instanceof PriorityQueueSerializer) { @SuppressWarnings("unchecked") @@ -498,6 +496,32 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> public int hashCode() { return Objects.hash(factory, elementSerializer); } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + return new CollectionSerializerConfigSnapshot(elementSerializer.snapshotConfiguration()); + } + + @Override + public CompatibilityResult<PriorityQueue<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof CollectionSerializerConfigSnapshot) { + CompatibilityResult<T> compatResult = elementSerializer.ensureCompatibility( + ((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot()); + + if (!compatResult.requiresMigration()) { + return CompatibilityResult.compatible(); + } else if (compatResult.getConvertDeserializer() != null) { + return CompatibilityResult.requiresMigration( + new PriorityQueueSerializer<>(compatResult.getConvertDeserializer(), factory)); + } + } + + return CompatibilityResult.requiresMigration(null); + } } private interface PriorityQueueFactory<T> extends Serializable { http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java index b86fe87..5984122 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.types.valuearray; import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -82,4 +83,10 @@ public final class IntValueArraySerializer extends TypeSerializerSingleton<IntVa public boolean canEqual(Object obj) { return obj instanceof IntValueArraySerializer; } + + @Override + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + return super.isCompatibleSerializationFormatIdentifier(identifier) + || identifier.equals(IntPrimitiveArraySerializer.class.getCanonicalName()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java index 95219b6..e95a1a7 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.types.valuearray; import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -82,4 +83,10 @@ public final class LongValueArraySerializer extends TypeSerializerSingleton<Long public boolean canEqual(Object obj) { return obj instanceof LongValueArraySerializer; } + + @Override + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + return super.isCompatibleSerializationFormatIdentifier(identifier) + || identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java index 0e875e3..6dbe0e5 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.types.valuearray; import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -82,4 +83,10 @@ public final class StringValueArraySerializer extends TypeSerializerSingleton<St public boolean canEqual(Object obj) { return obj instanceof StringValueArraySerializer; } + + @Override + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + return super.isCompatibleSerializationFormatIdentifier(identifier) + || identifier.equals(StringArraySerializer.class.getCanonicalName()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala index 1f56a98..7ffa57c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.runtime.types -import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils._ import org.apache.flink.core.memory.{DataInputView, DataOutputView} import org.apache.flink.types.Row @@ -75,4 +75,55 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali } override def hashCode: Int = rowSerializer.hashCode() * 13 + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + override def snapshotConfiguration(): TypeSerializerConfigSnapshot = { + new CRowSerializer.CRowSerializerConfigSnapshot( + rowSerializer.snapshotConfiguration()) + } + + override def ensureCompatibility( + configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[CRow] = { + + configSnapshot match { + case crowSerializerConfigSnapshot: CRowSerializer.CRowSerializerConfigSnapshot => + val compatResult = rowSerializer.ensureCompatibility( + crowSerializerConfigSnapshot.getSingleNestedSerializerConfigSnapshot) + + if (compatResult.requiresMigration()) { + if (compatResult.getConvertDeserializer != null) { + CompatibilityResult.requiresMigration( + new CRowSerializer(compatResult.getConvertDeserializer) + ) + } else { + CompatibilityResult.requiresMigration(null) + } + } else { + CompatibilityResult.compatible() + } + + case _ => CompatibilityResult.requiresMigration(null) + } + } +} + +object CRowSerializer { + + class CRowSerializerConfigSnapshot( + private var rowSerializerConfigSnapshot: TypeSerializerConfigSnapshot) + extends CompositeTypeSerializerConfigSnapshot(rowSerializerConfigSnapshot) { + + /** This empty nullary constructor is required for deserializing the configuration. */ + def this() = this(null) + + override def getVersion: Int = CRowSerializerConfigSnapshot.VERSION + } + + object CRowSerializerConfigSnapshot { + val VERSION = 1 + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java index c6813b6..c4e23ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java @@ -18,7 +18,10 @@ package org.apache.flink.migration; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -51,8 +54,7 @@ public class MigrationNamespaceSerializerProxy extends TypeSerializer<Serializab @Override public TypeSerializer<Serializable> duplicate() { - throw new UnsupportedOperationException( - "This is just a proxy used during migration until the real type serializer is provided by the user."); + return this; } @Override @@ -103,6 +105,17 @@ public class MigrationNamespaceSerializerProxy extends TypeSerializer<Serializab } @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + return new ParameterlessTypeSerializerConfig(getClass().getCanonicalName()); + } + + @Override + public CompatibilityResult<Serializable> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + // always assume compatibility since we're just a proxy for migration + return CompatibilityResult.compatible(); + } + + @Override public boolean equals(Object obj) { return obj instanceof MigrationNamespaceSerializerProxy; } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java index f47989a..f58070e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; @@ -85,14 +85,14 @@ public abstract class AbstractMigrationRestoreStrategy<K, N, S> implements Migra patchedNamespaceSerializer = (TypeSerializer<N>) VoidNamespaceSerializer.INSTANCE; } - RegisteredBackendStateMetaInfo<N, S> registeredBackendStateMetaInfo = - new RegisteredBackendStateMetaInfo<>( + RegisteredKeyedBackendStateMetaInfo<N, S> registeredKeyedBackendStateMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( StateDescriptor.Type.UNKNOWN, stateName, patchedNamespaceSerializer, stateSerializer); - final StateTable<K, N, S> stateTable = stateBackend.newStateTable(registeredBackendStateMetaInfo); + final StateTable<K, N, S> stateTable = stateBackend.newStateTable(registeredKeyedBackendStateMetaInfo); final DataInputView inView = openDataInputView(); final int keyGroup = keyGroupRange.getStartKeyGroup(); final int numNamespaces = inView.readInt(); http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java index 0badb41..8fbc227 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java @@ -17,7 +17,10 @@ */ package org.apache.flink.runtime.state; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -132,4 +135,30 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> { public int hashCode() { return elementSerializer.hashCode(); } -} \ No newline at end of file + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + return new CollectionSerializerConfigSnapshot(elementSerializer.snapshotConfiguration()); + } + + @Override + public CompatibilityResult<ArrayList<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof CollectionSerializerConfigSnapshot) { + CompatibilityResult<T> compatResult = elementSerializer.ensureCompatibility( + ((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot()); + + if (!compatResult.requiresMigration()) { + return CompatibilityResult.compatible(); + } else if (compatResult.getConvertDeserializer() != null) { + return CompatibilityResult.requiresMigration( + new ArrayListSerializer<>(compatResult.getConvertDeserializer())); + } + } + + return CompatibilityResult.requiresMigration(null); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index e7ed26f..ec4aa81 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -207,26 +207,18 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = new HashMap<>(registeredStatesDeepCopies.size()); - List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList = + List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> metaInfoSnapshots = new ArrayList<>(registeredStatesDeepCopies.size()); - for (Map.Entry<String, PartitionableListState<?>> entry : - registeredStatesDeepCopies.entrySet()) { - - PartitionableListState<?> state = entry.getValue(); - OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo = - new OperatorBackendSerializationProxy.StateMetaInfo<>( - state.getName(), - state.getPartitionStateSerializer(), - state.getAssignmentMode()); - metaInfoList.add(metaInfo); + for (Map.Entry<String, PartitionableListState<?>> entry : registeredStatesDeepCopies.entrySet()) { + metaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot()); } CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle(); DataOutputView dov = new DataOutputViewStreamWrapper(out); OperatorBackendSerializationProxy backendSerializationProxy = - new OperatorBackendSerializationProxy(metaInfoList); + new OperatorBackendSerializationProxy(metaInfoSnapshots); backendSerializationProxy.write(dov); @@ -237,7 +229,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { PartitionableListState<?> value = entry.getValue(); long[] partitionOffsets = value.write(out); - OperatorStateHandle.Mode mode = value.getAssignmentMode(); + OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode(); writtenStatesMetaData.put( entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); @@ -254,10 +246,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { return null; } - OperatorStateHandle operatorStateHandle = - new OperatorStateHandle(writtenStatesMetaData, stateHandle); - - return operatorStateHandle; + return new OperatorStateHandle(writtenStatesMetaData, stateHandle); } }; @@ -298,25 +287,23 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { backendSerializationProxy.read(new DataInputViewStreamWrapper(in)); - List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList = - backendSerializationProxy.getNamedStateSerializationProxies(); + List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredMetaInfoSnapshots = + backendSerializationProxy.getStateMetaInfoSnapshots(); // Recreate all PartitionableListStates from the meta info - for (OperatorBackendSerializationProxy.StateMetaInfo<?> stateMetaInfo : metaInfoList) { - PartitionableListState<?> listState = registeredStates.get(stateMetaInfo.getName()); + for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo : restoredMetaInfoSnapshots) { + PartitionableListState<?> listState = registeredStates.get(restoredMetaInfo.getName()); if (null == listState) { listState = new PartitionableListState<>( - stateMetaInfo.getName(), - stateMetaInfo.getStateSerializer(), - stateMetaInfo.getMode()); + new RegisteredOperatorBackendStateMetaInfo<>( + restoredMetaInfo.getName(), + restoredMetaInfo.getPartitionStateSerializer(), + restoredMetaInfo.getAssignmentMode())); - registeredStates.put(listState.getName(), listState); + registeredStates.put(listState.getStateMetaInfo().getName(), listState); } else { - Preconditions.checkState(listState.getPartitionStateSerializer().canRestoreFrom( - stateMetaInfo.getStateSerializer()), "Incompatible state serializers found: " + - listState.getPartitionStateSerializer() + " is not compatible with " + - stateMetaInfo.getStateSerializer()); + // TODO with eager state registration in place, check here for serializer migration strategies } } @@ -341,7 +328,6 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { } /** - * * Implementation of operator list state. * * @param <S> the type of an operator state partition. @@ -349,19 +335,9 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { static final class PartitionableListState<S> implements ListState<S> { /** - * The name of the state, as registered by the user - */ - private final String name; - - /** - * The type serializer for the elements in the state list - */ - private final TypeSerializer<S> partitionStateSerializer; - - /** - * The mode how elements in this state are assigned to tasks during restore + * Meta information of the state, including state name, assignment mode, and serializer */ - private final OperatorStateHandle.Mode assignmentMode; + private final RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo; /** * The internal list the holds the elements of the state @@ -373,46 +349,26 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { */ private final ArrayListSerializer<S> internalListCopySerializer; - public PartitionableListState( - String name, - TypeSerializer<S> partitionStateSerializer, - OperatorStateHandle.Mode assignmentMode) { - - this(name, partitionStateSerializer, assignmentMode, new ArrayList<S>()); + public PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) { + this(stateMetaInfo, new ArrayList<S>()); } private PartitionableListState( - String name, - TypeSerializer<S> partitionStateSerializer, - OperatorStateHandle.Mode assignmentMode, + RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo, ArrayList<S> internalList) { - this.name = Preconditions.checkNotNull(name); - this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer); - this.assignmentMode = Preconditions.checkNotNull(assignmentMode); + this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo); this.internalList = Preconditions.checkNotNull(internalList); - this.internalListCopySerializer = new ArrayListSerializer<>(partitionStateSerializer); + this.internalListCopySerializer = new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer()); } private PartitionableListState(PartitionableListState<S> toCopy) { - this( - toCopy.name, - toCopy.partitionStateSerializer.duplicate(), - toCopy.assignmentMode, - toCopy.internalListCopySerializer.copy(toCopy.internalList)); - } - - public String getName() { - return name; - } - - public OperatorStateHandle.Mode getAssignmentMode() { - return assignmentMode; + this(toCopy.stateMetaInfo, toCopy.internalListCopySerializer.copy(toCopy.internalList)); } - public TypeSerializer<S> getPartitionStateSerializer() { - return partitionStateSerializer; + public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() { + return stateMetaInfo; } public List<S> getInternalList() { @@ -441,8 +397,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { @Override public String toString() { return "PartitionableListState{" + - "name='" + name + '\'' + - ", assignmentMode=" + assignmentMode + + "stateMetaInfo=" + stateMetaInfo + ", internalList=" + internalList + '}'; } @@ -456,7 +411,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { for (int i = 0; i < internalList.size(); ++i) { S element = internalList.get(i); partitionOffsets[i] = out.getPos(); - partitionStateSerializer.serialize(element, dov); + getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov); } return partitionOffsets; @@ -466,7 +421,6 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { private <S> ListState<S> getListState( ListStateDescriptor<S> stateDescriptor, OperatorStateHandle.Mode mode) throws IOException { - Preconditions.checkNotNull(stateDescriptor); stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig()); @@ -478,23 +432,27 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredStates.get(name); if (null == partitionableListState) { - partitionableListState = new PartitionableListState<>( - name, - partitionStateSerializer, - mode); + new RegisteredOperatorBackendStateMetaInfo<>( + name, + partitionStateSerializer, + mode)); registeredStates.put(name, partitionableListState); } else { + // TODO with eager registration in place, these checks should be moved to restore() + Preconditions.checkState( - partitionableListState.getAssignmentMode().equals(mode), - "Incompatible assignment mode. Provided: " + mode + ", expected: " + - partitionableListState.getAssignmentMode()); + partitionableListState.getStateMetaInfo().getName().equals(name), + "Incompatible state names. " + + "Was [" + partitionableListState.getStateMetaInfo().getName() + "], " + + "registered with [" + name + "]."); + Preconditions.checkState( - stateDescriptor.getElementSerializer(). - canRestoreFrom(partitionableListState.getPartitionStateSerializer()), - "Incompatible type serializers. Provided: " + stateDescriptor.getElementSerializer() + - ", found: " + partitionableListState.getPartitionStateSerializer()); + partitionableListState.getStateMetaInfo().getAssignmentMode().equals(mode), + "Incompatible state assignment modes. " + + "Was [" + partitionableListState.getStateMetaInfo().getAssignmentMode() + "], " + + "registered with [" + mode + "]."); } return partitionableListState; @@ -509,7 +467,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { long[] offsets = metaInfo.getOffsets(); if (null != offsets) { DataInputView div = new DataInputViewStreamWrapper(in); - TypeSerializer<S> serializer = stateListForName.getPartitionStateSerializer(); + TypeSerializer<S> serializer = stateListForName.getStateMetaInfo().getPartitionStateSerializer(); for (long offset : offsets) { in.seek(offset); stateListForName.add(serializer.deserialize(div)); http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java index 61cc58c..d52c207 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java @@ -18,7 +18,11 @@ package org.apache.flink.runtime.state; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.Preconditions; @@ -38,7 +42,8 @@ import java.util.Map; * @param <K> The type of the keys in the map. * @param <V> The type of the values in the map. */ -public class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> { +@Internal +public final class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> { private static final long serialVersionUID = -6885593032367050078L; @@ -190,4 +195,37 @@ public class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> { public int hashCode() { return keySerializer.hashCode() * 31 + valueSerializer.hashCode(); } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + return new MapSerializerConfigSnapshot( + keySerializer.snapshotConfiguration(), + valueSerializer.snapshotConfiguration()); + } + + @Override + public CompatibilityResult<HashMap<K, V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof MapSerializerConfigSnapshot) { + TypeSerializerConfigSnapshot[] keyValueSerializerConfigSnapshots = + ((MapSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots(); + + CompatibilityResult<K> keyCompatResult = keySerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[0]); + CompatibilityResult<V> valueCompatResult = valueSerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[1]); + + if (!keyCompatResult.requiresMigration() && !valueCompatResult.requiresMigration()) { + return CompatibilityResult.compatible(); + } else if (keyCompatResult.getConvertDeserializer() != null && valueCompatResult.getConvertDeserializer() != null) { + return CompatibilityResult.requiresMigration( + new HashMapSerializer<>( + keyCompatResult.getConvertDeserializer(), + valueCompatResult.getConvertDeserializer())); + } + } + + return CompatibilityResult.requiresMigration(null); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java index 512baf6..d49b1d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; import org.apache.flink.core.memory.DataInputView; @@ -31,7 +31,7 @@ import java.io.Serializable; @SuppressWarnings("serial") @Internal -final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> { +final class JavaSerializer<T extends Serializable> extends TypeSerializerSingleton<T> { private static final long serialVersionUID = 5067491650263321234L; @@ -41,11 +41,6 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> { } @Override - public TypeSerializer<T> duplicate() { - return this; - } - - @Override public T createInstance() { return null; } @@ -98,17 +93,7 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> { } @Override - public boolean equals(Object obj) { - return obj instanceof JavaSerializer; - } - - @Override public boolean canEqual(Object obj) { return obj instanceof JavaSerializer; } - - @Override - public int hashCode() { - return getClass().hashCode(); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java index 5661c38..a389c4f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java @@ -18,10 +18,8 @@ package org.apache.flink.runtime.state; -import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; -import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.io.VersionMismatchException; import org.apache.flink.core.io.VersionedIOReadableWritable; import org.apache.flink.core.memory.DataInputView; @@ -33,15 +31,15 @@ import java.util.ArrayList; import java.util.List; /** - * Serialization proxy for all meta data in keyed state backends. In the future we might also migrate the actual state + * Serialization proxy for all meta data in keyed state backends. In the future we might also requiresMigration the actual state * serialization logic here. */ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable { - public static final int VERSION = 2; + public static final int VERSION = 3; - private TypeSerializerSerializationProxy<?> keySerializerProxy; - private List<StateMetaInfo<?, ?>> namedStateSerializationProxies; + private TypeSerializer<?> keySerializer; + private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots; private int restoredVersion; private ClassLoader userCodeClassLoader; @@ -50,19 +48,25 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader); } - public KeyedBackendSerializationProxy(TypeSerializer<?> keySerializer, List<StateMetaInfo<?, ?>> namedStateSerializationProxies) { - this.keySerializerProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer)); - this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies); + public KeyedBackendSerializationProxy( + TypeSerializer<?> keySerializer, + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) { + + this.keySerializer = Preconditions.checkNotNull(keySerializer); + + Preconditions.checkNotNull(stateMetaInfoSnapshots); + Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= Short.MAX_VALUE); + this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; + this.restoredVersion = VERSION; - Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE); } - public List<StateMetaInfo<?, ?>> getNamedStateSerializationProxies() { - return namedStateSerializationProxies; + public List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> getStateMetaInfoSnapshots() { + return stateMetaInfoSnapshots; } - public TypeSerializerSerializationProxy<?> getKeySerializerProxy() { - return keySerializerProxy; + public TypeSerializer<?> getKeySerializer() { + return keySerializer; } @Override @@ -82,20 +86,22 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable @Override public boolean isCompatibleVersion(int version) { - // we are compatible with version 2 (Flink 1.3.x) and version 1 (Flink 1.2.x) - return super.isCompatibleVersion(version) || version == 1; + // we are compatible with version 3 (Flink 1.3.x) and version 1 & 2 (Flink 1.2.x) + return super.isCompatibleVersion(version) || version == 2 || version == 1; } @Override public void write(DataOutputView out) throws IOException { super.write(out); - keySerializerProxy.write(out); + new TypeSerializerSerializationProxy<>(keySerializer).write(out); - out.writeShort(namedStateSerializationProxies.size()); + out.writeShort(stateMetaInfoSnapshots.size()); - for (StateMetaInfo<?, ?> kvState : namedStateSerializationProxies) { - kvState.write(out); + for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo : stateMetaInfoSnapshots) { + KeyedBackendStateMetaInfoSnapshotReaderWriters + .getWriterForVersion(VERSION, metaInfo) + .writeStateMetaInfo(out); } } @@ -103,132 +109,18 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable public void read(DataInputView in) throws IOException { super.read(in); - keySerializerProxy = new TypeSerializerSerializationProxy<>(userCodeClassLoader); + final TypeSerializerSerializationProxy<?> keySerializerProxy = + new TypeSerializerSerializationProxy<>(userCodeClassLoader); keySerializerProxy.read(in); + this.keySerializer = keySerializerProxy.getTypeSerializer(); int numKvStates = in.readShort(); - namedStateSerializationProxies = new ArrayList<>(numKvStates); - for (int i = 0; i < numKvStates; ++i) { - StateMetaInfo<?, ?> stateSerializationProxy = new StateMetaInfo<>(userCodeClassLoader); - stateSerializationProxy.read(in); - namedStateSerializationProxies.add(stateSerializationProxy); - } - } - - //---------------------------------------------------------------------------------------------------------------------- - - /** - * This is the serialization proxy for {@link RegisteredBackendStateMetaInfo} for a single registered state in a - * keyed backend. - */ - public static class StateMetaInfo<N, S> implements IOReadableWritable { - - private StateDescriptor.Type stateType; - private String stateName; - private TypeSerializerSerializationProxy<N> namespaceSerializerSerializationProxy; - private TypeSerializerSerializationProxy<S> stateSerializerSerializationProxy; - - private ClassLoader userClassLoader; - - StateMetaInfo(ClassLoader userClassLoader) { - this.userClassLoader = Preconditions.checkNotNull(userClassLoader); - } - - public StateMetaInfo( - StateDescriptor.Type stateType, - String name, - TypeSerializer<N> namespaceSerializer, - TypeSerializer<S> stateSerializer) { - - this.stateType = Preconditions.checkNotNull(stateType); - this.stateName = Preconditions.checkNotNull(name); - this.namespaceSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(namespaceSerializer)); - this.stateSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(stateSerializer)); - } - - public StateDescriptor.Type getStateType() { - return stateType; - } - - public void setStateType(StateDescriptor.Type stateType) { - this.stateType = stateType; - } - - public String getStateName() { - return stateName; - } - - public void setStateName(String stateName) { - this.stateName = stateName; - } - - public TypeSerializerSerializationProxy<N> getNamespaceSerializerSerializationProxy() { - return namespaceSerializerSerializationProxy; - } - - public void setNamespaceSerializerSerializationProxy(TypeSerializerSerializationProxy<N> namespaceSerializerSerializationProxy) { - this.namespaceSerializerSerializationProxy = namespaceSerializerSerializationProxy; - } - - public TypeSerializerSerializationProxy<S> getStateSerializerSerializationProxy() { - return stateSerializerSerializationProxy; - } - - public void setStateSerializerSerializationProxy(TypeSerializerSerializationProxy<S> stateSerializerSerializationProxy) { - this.stateSerializerSerializationProxy = stateSerializerSerializationProxy; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeInt(getStateType().ordinal()); - out.writeUTF(getStateName()); - - getNamespaceSerializerSerializationProxy().write(out); - getStateSerializerSerializationProxy().write(out); - } - - @Override - public void read(DataInputView in) throws IOException { - int enumOrdinal = in.readInt(); - setStateType(StateDescriptor.Type.values()[enumOrdinal]); - setStateName(in.readUTF()); - - namespaceSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(userClassLoader); - namespaceSerializerSerializationProxy.read(in); - - stateSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(userClassLoader); - stateSerializerSerializationProxy.read(in); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - - if (o == null || getClass() != o.getClass()) { - return false; - } - - StateMetaInfo<?, ?> that = (StateMetaInfo<?, ?>) o; - - if (!getStateName().equals(that.getStateName())) { - return false; - } - - if (!getNamespaceSerializerSerializationProxy().equals(that.getNamespaceSerializerSerializationProxy())) { - return false; - } - - return getStateSerializerSerializationProxy().equals(that.getStateSerializerSerializationProxy()); - } - - @Override - public int hashCode() { - int result = getStateName().hashCode(); - result = 31 * result + getNamespaceSerializerSerializationProxy().hashCode(); - result = 31 * result + getStateSerializerSerializationProxy().hashCode(); - return result; + stateMetaInfoSnapshots = new ArrayList<>(numKvStates); + for (int i = 0; i < numKvStates; i++) { + stateMetaInfoSnapshots.add( + KeyedBackendStateMetaInfoSnapshotReaderWriters + .getReaderForVersion(restoredVersion, userCodeClassLoader) + .readStateMetaInfo(in)); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java new file mode 100644 index 0000000..83aa335 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; +import org.apache.flink.api.common.typeutils.TypeSerializerUtil; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * Readers and writers for different versions of the {@link RegisteredKeyedBackendStateMetaInfo.Snapshot}. + * Outdated formats are also kept here for documentation of history backlog. + */ +public class KeyedBackendStateMetaInfoSnapshotReaderWriters { + + // ------------------------------------------------------------------------------- + // Writers + // - v1: Flink 1.2.x + // - v2: Flink 1.3.x + // ------------------------------------------------------------------------------- + + public static <N, S> KeyedBackendStateMetaInfoWriter getWriterForVersion( + int version, RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo) { + + switch (version) { + case 1: + case 2: + return new KeyedBackendStateMetaInfoWriterV1V2<>(stateMetaInfo); + + // current version + case KeyedBackendSerializationProxy.VERSION: + return new KeyedBackendStateMetaInfoWriterV3<>(stateMetaInfo); + + default: + // guard for future + throw new IllegalStateException( + "Unrecognized keyed backend state meta info writer version: " + version); + } + } + + public interface KeyedBackendStateMetaInfoWriter { + void writeStateMetaInfo(DataOutputView out) throws IOException; + } + + static abstract class AbstractKeyedBackendStateMetaInfoWriter<N, S> implements KeyedBackendStateMetaInfoWriter { + + protected final RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo; + + public AbstractKeyedBackendStateMetaInfoWriter(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo) { + this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo); + } + + } + + static class KeyedBackendStateMetaInfoWriterV1V2<N, S> extends AbstractKeyedBackendStateMetaInfoWriter<N, S> { + + public KeyedBackendStateMetaInfoWriterV1V2(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo) { + super(stateMetaInfo); + } + + @Override + public void writeStateMetaInfo(DataOutputView out) throws IOException { + out.writeInt(stateMetaInfo.getStateType().ordinal()); + out.writeUTF(stateMetaInfo.getName()); + + new TypeSerializerSerializationProxy<>(stateMetaInfo.getNamespaceSerializer()).write(out); + new TypeSerializerSerializationProxy<>(stateMetaInfo.getStateSerializer()).write(out); + } + } + + static class KeyedBackendStateMetaInfoWriterV3<N, S> extends AbstractKeyedBackendStateMetaInfoWriter<N, S> { + + public KeyedBackendStateMetaInfoWriterV3(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo) { + super(stateMetaInfo); + } + + @Override + public void writeStateMetaInfo(DataOutputView out) throws IOException { + out.writeInt(stateMetaInfo.getStateType().ordinal()); + out.writeUTF(stateMetaInfo.getName()); + + // write in a way that allows us to be fault-tolerant and skip blocks in the case of java serialization failures + try ( + ByteArrayOutputStreamWithPos outWithPos = new ByteArrayOutputStreamWithPos(); + DataOutputViewStreamWrapper outViewWrapper = new DataOutputViewStreamWrapper(outWithPos)) { + + new TypeSerializerSerializationProxy<>(stateMetaInfo.getNamespaceSerializer()).write(outViewWrapper); + + // write current offset, which represents the start offset of the state serializer + out.writeInt(outWithPos.getPosition()); + new TypeSerializerSerializationProxy<>(stateMetaInfo.getStateSerializer()).write(outViewWrapper); + + // write current offset, which represents the start of the configuration snapshots + out.writeInt(outWithPos.getPosition()); + TypeSerializerUtil.writeSerializerConfigSnapshot(outViewWrapper, stateMetaInfo.getNamespaceSerializerConfigSnapshot()); + TypeSerializerUtil.writeSerializerConfigSnapshot(outViewWrapper, stateMetaInfo.getStateSerializerConfigSnapshot()); + + // write total number of bytes and then flush + out.writeInt(outWithPos.getPosition()); + out.write(outWithPos.getBuf(), 0, outWithPos.getPosition()); + } + } + } + + + // ------------------------------------------------------------------------------- + // Readers + // - v1: Flink 1.2.x + // - v2: Flink 1.3.x + // ------------------------------------------------------------------------------- + + public static KeyedBackendStateMetaInfoReader getReaderForVersion( + int version, ClassLoader userCodeClassLoader) { + + switch (version) { + case 1: + case 2: + return new KeyedBackendStateMetaInfoReaderV1V2<>(userCodeClassLoader); + + // current version + case KeyedBackendSerializationProxy.VERSION: + return new KeyedBackendStateMetaInfoReaderV3<>(userCodeClassLoader); + + default: + // guard for future + throw new IllegalStateException( + "Unrecognized keyed backend state meta info reader version: " + version); + } + } + + public interface KeyedBackendStateMetaInfoReader<N, S> { + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> readStateMetaInfo(DataInputView in) throws IOException; + } + + static abstract class AbstractKeyedBackendStateMetaInfoReader implements KeyedBackendStateMetaInfoReader { + + protected final ClassLoader userCodeClassLoader; + + public AbstractKeyedBackendStateMetaInfoReader(ClassLoader userCodeClassLoader) { + this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader); + } + + } + + static class KeyedBackendStateMetaInfoReaderV1V2<N, S> extends AbstractKeyedBackendStateMetaInfoReader { + + public KeyedBackendStateMetaInfoReaderV1V2(ClassLoader userCodeClassLoader) { + super(userCodeClassLoader); + } + + @Override + public RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> readStateMetaInfo(DataInputView in) throws IOException { + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> metaInfo = + new RegisteredKeyedBackendStateMetaInfo.Snapshot<>(); + + metaInfo.setStateType(StateDescriptor.Type.values()[in.readInt()]); + metaInfo.setName(in.readUTF()); + + final TypeSerializerSerializationProxy<N> namespaceSerializerProxy = + new TypeSerializerSerializationProxy<>(userCodeClassLoader); + namespaceSerializerProxy.read(in); + metaInfo.setNamespaceSerializer(namespaceSerializerProxy.getTypeSerializer()); + + final TypeSerializerSerializationProxy<S> stateSerializerProxy = + new TypeSerializerSerializationProxy<>(userCodeClassLoader); + stateSerializerProxy.read(in); + metaInfo.setStateSerializer(stateSerializerProxy.getTypeSerializer()); + + // older versions do not contain the configuration snapshot + metaInfo.setNamespaceSerializerConfigSnapshot(null); + metaInfo.setStateSerializerConfigSnapshot(null); + + return metaInfo; + } + } + + static class KeyedBackendStateMetaInfoReaderV3<N, S> extends AbstractKeyedBackendStateMetaInfoReader { + + public KeyedBackendStateMetaInfoReaderV3(ClassLoader userCodeClassLoader) { + super(userCodeClassLoader); + } + + @Override + public RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> readStateMetaInfo(DataInputView in) throws IOException { + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> metaInfo = + new RegisteredKeyedBackendStateMetaInfo.Snapshot<>(); + + metaInfo.setStateType(StateDescriptor.Type.values()[in.readInt()]); + metaInfo.setName(in.readUTF()); + + // read offsets + int stateSerializerStartOffset = in.readInt(); + int configSnapshotsStartOffset = in.readInt(); + + int totalBytes = in.readInt(); + + byte[] buffer = new byte[totalBytes]; + in.readFully(buffer); + + ByteArrayInputStreamWithPos inWithPos = new ByteArrayInputStreamWithPos(buffer); + DataInputViewStreamWrapper inViewWrapper = new DataInputViewStreamWrapper(inWithPos); + + try { + final TypeSerializerSerializationProxy<N> namespaceSerializerProxy = + new TypeSerializerSerializationProxy<>(userCodeClassLoader); + namespaceSerializerProxy.read(inViewWrapper); + metaInfo.setNamespaceSerializer(namespaceSerializerProxy.getTypeSerializer()); + } catch (IOException e) { + metaInfo.setNamespaceSerializer(null); + } + + // make sure we start from the state serializer bytes position + inWithPos.setPosition(stateSerializerStartOffset); + try { + final TypeSerializerSerializationProxy<S> stateSerializerProxy = + new TypeSerializerSerializationProxy<>(userCodeClassLoader); + stateSerializerProxy.read(inViewWrapper); + metaInfo.setStateSerializer(stateSerializerProxy.getTypeSerializer()); + } catch (IOException e) { + metaInfo.setStateSerializer(null); + } + + // make sure we start from the config snapshot bytes position + inWithPos.setPosition(configSnapshotsStartOffset); + metaInfo.setNamespaceSerializerConfigSnapshot( + TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, userCodeClassLoader)); + metaInfo.setStateSerializerConfigSnapshot( + TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, userCodeClassLoader)); + + return metaInfo; + } + } +}
