This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit da1a402e3d8b7c696e034e466f70a20d5335cc13 Author: klion26 <[email protected]> AuthorDate: Wed Apr 15 19:49:58 2020 +0800 [FLINK-13632] Port EnumValueSerializer upgrade test to TypeSerializerUpgradeTestBase --- .../resources/flink-1.6-scala-enum-serializer-data | Bin 40 -> 0 bytes .../flink-1.6-scala-enum-serializer-snapshot | Bin 1865 -> 0 bytes .../resources/flink-1.7-scala-enum-serializer-data | Bin 40 -> 0 bytes .../flink-1.7-scala-enum-serializer-snapshot | Bin 1853 -> 0 bytes ... => EnumValueSerializerCompatibilityTest.scala} | 6 +- .../EnumValueSerializerSnapshotMigrationTest.scala | 61 ------ .../typeutils/EnumValueSerializerUpgradeTest.scala | 231 +++++---------------- 7 files changed, 57 insertions(+), 241 deletions(-) diff --git a/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-data b/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-data deleted file mode 100644 index 6459c29..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-snapshot b/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-snapshot deleted file mode 100644 index 3cf1738..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-data b/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-data deleted file mode 100644 index 6459c29..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-snapshot b/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-snapshot deleted file mode 100644 index 99ea254..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-snapshot and /dev/null differ diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala similarity index 97% copy from flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala copy to flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala index 97c2b05..c3cdda4 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala @@ -33,7 +33,7 @@ import scala.reflect.NameTransformer import scala.tools.nsc.reporters.ConsoleReporter import scala.tools.nsc.{GenericRunnerSettings, Global} -class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike { +class EnumValueSerializerCompatibilityTest extends TestLogger with JUnitSuiteLike { private val _tempFolder = new TemporaryFolder() @@ -123,7 +123,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike { def checkCompatibility(enumSourceA: String, enumSourceB: String) : TypeSerializerSchemaCompatibility[Enumeration#Value] = { - import EnumValueSerializerUpgradeTest._ + import EnumValueSerializerCompatibilityTest._ val classLoader = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceA) @@ -156,7 +156,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike { } } -object EnumValueSerializerUpgradeTest { +object EnumValueSerializerCompatibilityTest { def compileAndLoadEnum(root: File, filename: String, source: String): ClassLoader = { val file = writeSourceFile(root, filename, source) diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerSnapshotMigrationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerSnapshotMigrationTest.scala deleted file mode 100644 index daff774..0000000 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerSnapshotMigrationTest.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.typeutils - -import java.util - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshotMigrationTestBase} -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase.{TestSpecification, TestSpecifications} -import org.apache.flink.api.scala.createTypeInformation -import org.apache.flink.testutils.migration.MigrationVersion -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -/** - * Migration tests for the [[EnumValueSerializer]]. - */ -@RunWith(classOf[Parameterized]) -class EnumValueSerializerSnapshotMigrationTest( - spec: TestSpecification[Letters.Value]) -extends TypeSerializerSnapshotMigrationTestBase[Letters.Value](spec) {} - -object EnumValueSerializerSnapshotMigrationTest { - - private val supplier = - new util.function.Supplier[EnumValueSerializer[Letters.type]] { - override def get(): EnumValueSerializer[Letters.type] = - new EnumValueSerializer(Letters) - } - - @Parameterized.Parameters(name = "Test Specification = {0}") - def testSpecifications(): util.Collection[TestSpecification[_]] = { - val spec = - new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7) - - spec.add( - "scala-enum-serializer", - classOf[EnumValueSerializer[Letters.Value]], - classOf[ScalaEnumSerializerSnapshot[Letters.Value]], - supplier - ) - - spec.get() - } -} diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala index 97c2b05..9c96df6 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala @@ -18,191 +18,68 @@ package org.apache.flink.api.scala.typeutils -import java.io._ -import java.net.{URL, URLClassLoader} - -import org.apache.flink.api.common.typeutils.{TypeSerializerSchemaCompatibility, TypeSerializerSnapshotSerializationUtil} -import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} -import org.apache.flink.util.TestLogger -import org.junit.rules.TemporaryFolder -import org.junit.{Rule, Test} -import org.junit.Assert._ -import org.scalatest.junit.JUnitSuiteLike - -import scala.reflect.NameTransformer -import scala.tools.nsc.reporters.ConsoleReporter -import scala.tools.nsc.{GenericRunnerSettings, Global} - -class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike { - - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - val enumName = "EnumValueSerializerUpgradeTestEnum" - - val enumA = - s""" - |object $enumName extends Enumeration { - | val A, B, C = Value - |} - """.stripMargin - - val enumB = - s""" - |object $enumName extends Enumeration { - | val A, B, C, D = Value - |} - """.stripMargin - - val enumC = - s""" - |object $enumName extends Enumeration { - | val A, C = Value - |} - """.stripMargin - - val enumD = - s""" - |object $enumName extends Enumeration { - | val A, C, B = Value - |} - """.stripMargin - - val enumE = - s""" - |object $enumName extends Enumeration { - | val A = Value(42) - | val B = Value(5) - | val C = Value(1337) - |} - """.stripMargin - - /** - * Check that identical enums don't require migration - */ - @Test - def checkIdenticalEnums(): Unit = { - assertTrue(checkCompatibility(enumA, enumA).isCompatibleAsIs) - } - - /** - * Check that appending fields to the enum does not require migration - */ - @Test - def checkAppendedField(): Unit = { - assertTrue(checkCompatibility(enumA, enumB).isCompatibleAsIs) - } - - /** - * Check that removing enum fields makes the snapshot incompatible. - */ - @Test - def checkRemovedField(): Unit = { - assertTrue(checkCompatibility(enumA, enumC).isIncompatible) - } - - /** - * Check that changing the enum field order makes the snapshot incompatible. - */ - @Test - def checkDifferentFieldOrder(): Unit = { - assertTrue(checkCompatibility(enumA, enumD).isIncompatible) - } - - /** - * Check that changing the enum ids causes a migration - */ - @Test - def checkDifferentIds(): Unit = { - assertTrue( - "Different ids should be incompatible.", - checkCompatibility(enumA, enumE).isIncompatible) - } - - def checkCompatibility(enumSourceA: String, enumSourceB: String) - : TypeSerializerSchemaCompatibility[Enumeration#Value] = { - import EnumValueSerializerUpgradeTest._ - - val classLoader = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceA) - - val enum = instantiateEnum[Enumeration](classLoader, enumName) - - val enumValueSerializer = new EnumValueSerializer(enum) - val snapshot = enumValueSerializer.snapshotConfiguration() - - val baos = new ByteArrayOutputStream() - val output = new DataOutputViewStreamWrapper(baos) - TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot( - output, snapshot, enumValueSerializer) - - output.close() - baos.close() - - val bais = new ByteArrayInputStream(baos.toByteArray) - val input= new DataInputViewStreamWrapper(bais) - - val classLoader2 = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceB) - - val snapshot2 = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot( - input, - classLoader2, - enumValueSerializer) - val enum2 = instantiateEnum[Enumeration](classLoader2, enumName) - - val enumValueSerializer2 = new EnumValueSerializer(enum2) - snapshot2.resolveSchemaCompatibility(enumValueSerializer2) - } -} +import java.util + +import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase.TestSpecification +import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerMatchers, TypeSerializerSchemaCompatibility, TypeSerializerUpgradeTestBase} +import org.apache.flink.testutils.migration.MigrationVersion +import org.hamcrest.Matcher +import org.hamcrest.Matchers.is +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +/** + * A [[TypeSerializerUpgradeTestBase]] for [[EnumValueSerializer]]. + */ +@RunWith(classOf[Parameterized]) +class EnumValueSerializerUpgradeTest( + spec: TestSpecification[Letters.Value, Letters.Value]) +extends TypeSerializerUpgradeTestBase[Letters.Value, Letters.Value](spec) {} object EnumValueSerializerUpgradeTest { - def compileAndLoadEnum(root: File, filename: String, source: String): ClassLoader = { - val file = writeSourceFile(root, filename, source) - compileScalaFile(file) - - new URLClassLoader( - Array[URL](root.toURI.toURL), - Thread.currentThread().getContextClassLoader) - } - - def instantiateEnum[T <: Enumeration](classLoader: ClassLoader, enumName: String): T = { - val clazz = classLoader.loadClass(enumName + "$").asInstanceOf[Class[_ <: Enumeration]] - val field = clazz.getField(NameTransformer.MODULE_INSTANCE_NAME) - - field.get(null).asInstanceOf[T] + private val supplier = + new util.function.Supplier[EnumValueSerializer[Letters.type]] { + override def get(): EnumValueSerializer[Letters.type] = + new EnumValueSerializer(Letters) + } + + @Parameterized.Parameters(name = "Test Specification = {0}") + def testSpecifications(): util.Collection[TestSpecification[_, _]] = { + val testSpecifications = + new util.ArrayList[TypeSerializerUpgradeTestBase.TestSpecification[_, _]] + + for (migrationVersion <- TypeSerializerUpgradeTestBase.MIGRATION_VERSIONS) { + testSpecifications.add( + new TypeSerializerUpgradeTestBase.TestSpecification[Letters.Value, Letters.Value]( + "scala-enum-serializer", + migrationVersion, + classOf[EnumValueSerializerSetup], + classOf[EnumValueSerializerVerifier])) + } + + testSpecifications } - def writeSourceFile(root: File, filename: String, source: String): File = { - val file = new File(root, filename) - val fileWriter = new FileWriter(file) - - fileWriter.write(source) - - fileWriter.close() - - file + /** + * This class is only public to work with + * [[org.apache.flink.api.common.typeutils.ClassRelocator]]. + */ + final class EnumValueSerializerSetup + extends TypeSerializerUpgradeTestBase.PreUpgradeSetup[Letters.Value] { + override def createPriorSerializer: TypeSerializer[Letters.Value] = supplier.get() + + override def createTestData: Letters.Value = Letters.A } - def compileScalaFile(file: File): Unit = { - val in = new BufferedReader(new StringReader("")) - val out = new PrintWriter(new BufferedWriter( - new OutputStreamWriter(System.out))) - - val settings = new GenericRunnerSettings(out.println _) + final class EnumValueSerializerVerifier extends + TypeSerializerUpgradeTestBase.UpgradeVerifier[Letters.Value] { + override def createUpgradedSerializer: TypeSerializer[Letters.Value] = supplier.get() - // use the java classpath so that scala libraries are available to the compiler - settings.usejavacp.value = true - settings.outdir.value = file.getParent + override def testDataMatcher: Matcher[Letters.Value] = is(Letters.A) - val reporter = new ConsoleReporter(settings) - val global = new Global(settings, reporter) - val run = new global.Run - - run.compile(List(file.getAbsolutePath)) - - reporter.printSummary() + override def schemaCompatibilityMatcher(version: MigrationVersion): + Matcher[TypeSerializerSchemaCompatibility[Letters.Value]] = + TypeSerializerMatchers.isCompatibleAsIs[Letters.Value]() } } -
