This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 760e9522efac757f37344ad12cd797b33c2a96ef Author: klion26 <[email protected]> AuthorDate: Tue Apr 14 16:36:08 2020 +0800 [FLINK-13632] Port EnumSerializer upgrade test to TypeSerializerUpgradeTestBase --- ...t.java => EnumSerializerCompatibilityTest.java} | 2 +- .../base/EnumSerializerSnapshotMigrationTest.java | 89 ---------- .../typeutils/base/EnumSerializerUpgradeTest.java | 195 ++++++++++++++------- .../test/resources/flink-1.6-enum-serializer-data | Bin 40 -> 0 bytes .../resources/flink-1.6-enum-serializer-snapshot | Bin 949 -> 0 bytes .../test/resources/flink-1.7-enum-serializer-data | Bin 40 -> 0 bytes .../resources/flink-1.7-enum-serializer-snapshot | Bin 937 -> 0 bytes 7 files changed, 129 insertions(+), 157 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerCompatibilityTest.java similarity index 98% copy from flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java copy to flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerCompatibilityTest.java index 93643b3..f15f76f3 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerCompatibilityTest.java @@ -35,7 +35,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -public class EnumSerializerUpgradeTest extends TestLogger { +public class EnumSerializerCompatibilityTest extends TestLogger { @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerSnapshotMigrationTest.java deleted file mode 100644 index d818974..0000000 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerSnapshotMigrationTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.typeutils.base; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; -import org.apache.flink.testutils.migration.MigrationVersion; - -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeMatcher; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.Collection; - -import static org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer; -import static org.apache.flink.api.common.typeutils.base.TestEnum.BAR; -import static org.apache.flink.api.common.typeutils.base.TestEnum.EMMA; -import static org.apache.flink.api.common.typeutils.base.TestEnum.FOO; -import static org.apache.flink.api.common.typeutils.base.TestEnum.NATHANIEL; -import static org.apache.flink.api.common.typeutils.base.TestEnum.PAULA; -import static org.apache.flink.api.common.typeutils.base.TestEnum.PETER; - -/** - * Migration tests for {@link EnumSerializer}. - */ -@RunWith(Parameterized.class) -public class EnumSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<TestEnum> { - private static final String SPEC_NAME = "enum-serializer"; - - public EnumSerializerSnapshotMigrationTest(TestSpecification<TestEnum> enumSerializer) { - super(enumSerializer); - } - - private static TestEnum[] previousEnumValues = {FOO, BAR, PETER, NATHANIEL, EMMA, PAULA}; - - @SuppressWarnings("unchecked") - @Parameterized.Parameters(name = "Test Specification = {0}") - public static Collection<TestSpecification<?>> testSpecifications() { - - final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); - - testSpecifications.addWithCompatibilityMatcher( - SPEC_NAME, - EnumSerializer.class, - EnumSerializer.EnumSerializerSnapshot.class, - () -> new EnumSerializer(TestEnum.class), - isCompatibleWithReconfiguredSerializer(enumSerializerWith(previousEnumValues)) - ); - - return testSpecifications.get(); - } - - private static Matcher<? extends TypeSerializer<TestEnum>> enumSerializerWith(final TestEnum[] expectedEnumValues) { - return new TypeSafeMatcher<EnumSerializer<TestEnum>>() { - - @Override - protected boolean matchesSafely(EnumSerializer<TestEnum> reconfiguredSerialized) { - return Arrays.equals(reconfiguredSerialized.getValues(), expectedEnumValues); - } - - @Override - public void describeTo(Description description) { - description - .appendText("EnumSerializer with values ") - .appendValueList("{", ", ", "}", expectedEnumValues); - } - }; - } -} - diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java index 93643b3..e1ba7e2 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java @@ -18,100 +18,161 @@ package org.apache.flink.api.common.typeutils.base; +import org.apache.flink.api.common.typeutils.ClassRelocator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerMatchers; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.testutils.ClassLoaderUtils; -import org.apache.flink.util.TestLogger; +import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase; +import org.apache.flink.testutils.migration.MigrationVersion; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; -public class EnumSerializerUpgradeTest extends TestLogger { +import static org.apache.flink.api.common.typeutils.base.TestEnum.EMMA; +import static org.hamcrest.Matchers.is; - @ClassRule - public static TemporaryFolder temporaryFolder = new TemporaryFolder(); - - private static final String ENUM_NAME = "EnumSerializerUpgradeTestEnum"; +/** + * Migration tests for {@link EnumSerializer}. + */ +@RunWith(Parameterized.class) +public class EnumSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<TestEnum, TestEnum> { + private static final String SPEC_NAME = "enum-serializer"; - private static final String ENUM_A = "public enum " + ENUM_NAME + " { A, B, C }"; - private static final String ENUM_B = "public enum " + ENUM_NAME + " { A, B, C, D }"; - private static final String ENUM_C = "public enum " + ENUM_NAME + " { A, C }"; - private static final String ENUM_D = "public enum " + ENUM_NAME + " { A, C, B }"; + public EnumSerializerUpgradeTest(TestSpecification<TestEnum, TestEnum> enumSerializer) { + super(enumSerializer); + } - /** - * Check that identical enums don't require migration - */ - @Test - public void checkIndenticalEnums() throws Exception { - Assert.assertTrue(checkCompatibility(ENUM_A, ENUM_A).isCompatibleAsIs()); + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection<TestSpecification<?, ?>> testSpecifications() throws Exception { + + ArrayList<TestSpecification<?, ?>> testSpecifications = new ArrayList<>(); + for (MigrationVersion migrationVersion : MIGRATION_VERSIONS) { + testSpecifications.add( + new TestSpecification<>( + SPEC_NAME, + migrationVersion, + EnumSerializerSetup.class, + EnumSerializerVerifier.class)); + testSpecifications.add( + new TestSpecification<>( + SPEC_NAME + "reconfig", + migrationVersion, + EnumSerializerReconfigSetup.class, + EnumSerializerReconfigVerifier.class)); + } + return testSpecifications; } - /** - * Check that appending fields to the enum does not require migration - */ - @Test - public void checkAppendedField() throws Exception { - Assert.assertTrue(checkCompatibility(ENUM_A, ENUM_B).isCompatibleWithReconfiguredSerializer()); + private static Matcher<? extends TypeSerializer<TestEnum>> enumSerializerWith(final TestEnum[] expectedEnumValues) { + return new TypeSafeMatcher<EnumSerializer<TestEnum>>() { + + @Override + protected boolean matchesSafely(EnumSerializer<TestEnum> reconfiguredSerialized) { + return Arrays.equals(reconfiguredSerialized.getValues(), expectedEnumValues); + } + + @Override + public void describeTo(Description description) { + description + .appendText("EnumSerializer with values ") + .appendValueList("{", ", ", "}", expectedEnumValues); + } + }; } + // ---------------------------------------------------------------------------------------------- + // Specification for "enum-serializer" + // ---------------------------------------------------------------------------------------------- + /** - * Check that removing enum fields makes the snapshot incompatible + * This class is only public to work with {@link org.apache.flink.api.common.typeutils.ClassRelocator}. */ - @Test(expected = IllegalStateException.class) - public void removingFieldShouldBeIncompatible() throws Exception { - Assert.assertTrue(checkCompatibility(ENUM_A, ENUM_C).isIncompatible()); + public static final class EnumSerializerSetup implements TypeSerializerUpgradeTestBase.PreUpgradeSetup<TestEnum> { + @SuppressWarnings("unchecked") + @Override + public TypeSerializer<TestEnum> createPriorSerializer() { + return new EnumSerializer(TestEnum.class); + } + + @Override + public TestEnum createTestData() { + return EMMA; + } } /** - * Check that changing the enum field order don't require migration + * This class is only public to work with {@link org.apache.flink.api.common.typeutils.ClassRelocator}. */ - @Test - public void checkDifferentFieldOrder() throws Exception { - Assert.assertTrue(checkCompatibility(ENUM_A, ENUM_D).isCompatibleWithReconfiguredSerializer()); - } + public static final class EnumSerializerVerifier implements TypeSerializerUpgradeTestBase.UpgradeVerifier<TestEnum> { + @SuppressWarnings("unchecked") + @Override + public TypeSerializer<TestEnum> createUpgradedSerializer() { + return new EnumSerializer(TestEnum.class); + } - @SuppressWarnings("unchecked") - private static TypeSerializerSchemaCompatibility checkCompatibility(String enumSourceA, String enumSourceB) - throws IOException, ClassNotFoundException { + @Override + public Matcher<TestEnum> testDataMatcher() { + return is(EMMA); + } - ClassLoader classLoader = ClassLoaderUtils.compileAndLoadJava( - temporaryFolder.newFolder(), ENUM_NAME + ".java", enumSourceA); + @Override + public Matcher<TypeSerializerSchemaCompatibility<TestEnum>> schemaCompatibilityMatcher(MigrationVersion version) { + return TypeSerializerMatchers.isCompatibleAsIs(); + } + } - EnumSerializer enumSerializer = new EnumSerializer(classLoader.loadClass(ENUM_NAME)); + /** + * This class is only public to work with {@link org.apache.flink.api.common.typeutils.ClassRelocator}. + */ + public static final class EnumSerializerReconfigSetup implements TypeSerializerUpgradeTestBase.PreUpgradeSetup<EnumSerializerReconfigSetup.EnumBefore> { + @ClassRelocator.RelocateClass("TestEnumSerializerReconfig") + public enum EnumBefore { + FOO, BAR, PETER, NATHANIEL, EMMA, PAULA + } - TypeSerializerSnapshot snapshot = enumSerializer.snapshotConfiguration(); - byte[] snapshotBytes; - try ( - ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper outputViewStreamWrapper = new DataOutputViewStreamWrapper(outBuffer)) { + @SuppressWarnings("unchecked") + @Override + public TypeSerializer<EnumBefore> createPriorSerializer() { + return new EnumSerializer(EnumBefore.class); + } - TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot( - outputViewStreamWrapper, snapshot, enumSerializer); - snapshotBytes = outBuffer.toByteArray(); + @Override + public EnumBefore createTestData() { + return EnumBefore.EMMA; } + } - ClassLoader classLoader2 = ClassLoaderUtils.compileAndLoadJava( - temporaryFolder.newFolder(), ENUM_NAME + ".java", enumSourceB); + /** + * This class is only public to work with {@link org.apache.flink.api.common.typeutils.ClassRelocator}. + */ + public static final class EnumSerializerReconfigVerifier implements TypeSerializerUpgradeTestBase.UpgradeVerifier<EnumSerializerReconfigVerifier.EnumAfter> { + @ClassRelocator.RelocateClass("TestEnumSerializerReconfig") + public enum EnumAfter { + FOO, BAR, PETER, PAULA, NATHANIEL, EMMA + } - TypeSerializerSnapshot restoredSnapshot; - try ( - ByteArrayInputStream inBuffer = new ByteArrayInputStream(snapshotBytes); - DataInputViewStreamWrapper inputViewStreamWrapper = new DataInputViewStreamWrapper(inBuffer)) { + @SuppressWarnings("unchecked") + @Override + public TypeSerializer<EnumAfter> createUpgradedSerializer() { + return new EnumSerializer(EnumAfter.class); + } - restoredSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot( - inputViewStreamWrapper, classLoader2, enumSerializer); + @Override + public Matcher<EnumAfter> testDataMatcher() { + return is(EnumAfter.EMMA); } - EnumSerializer enumSerializer2 = new EnumSerializer(classLoader2.loadClass(ENUM_NAME)); - return restoredSnapshot.resolveSchemaCompatibility(enumSerializer2); + @Override + public Matcher<TypeSerializerSchemaCompatibility<EnumAfter>> schemaCompatibilityMatcher(MigrationVersion version) { + return TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer(); + } } } + diff --git a/flink-core/src/test/resources/flink-1.6-enum-serializer-data b/flink-core/src/test/resources/flink-1.6-enum-serializer-data deleted file mode 100644 index 60d7723..0000000 Binary files a/flink-core/src/test/resources/flink-1.6-enum-serializer-data and /dev/null differ diff --git a/flink-core/src/test/resources/flink-1.6-enum-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-enum-serializer-snapshot deleted file mode 100644 index 9639d72..0000000 Binary files a/flink-core/src/test/resources/flink-1.6-enum-serializer-snapshot and /dev/null differ diff --git a/flink-core/src/test/resources/flink-1.7-enum-serializer-data b/flink-core/src/test/resources/flink-1.7-enum-serializer-data deleted file mode 100644 index 97a46a4..0000000 Binary files a/flink-core/src/test/resources/flink-1.7-enum-serializer-data and /dev/null differ diff --git a/flink-core/src/test/resources/flink-1.7-enum-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-enum-serializer-snapshot deleted file mode 100644 index b8add7c1..0000000 Binary files a/flink-core/src/test/resources/flink-1.7-enum-serializer-snapshot and /dev/null differ
