Repository: flink Updated Branches: refs/heads/release-1.2 5f81d20ba -> a7644b171
[FLINK-5484] [serialization] Add test for registered Kryo types Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55483b71 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55483b71 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55483b71 Branch: refs/heads/release-1.2 Commit: 55483b71f36b84ac57d03a9b83e0e9d9b9b98eab Parents: 5f81d20 Author: Ufuk Celebi <[email protected]> Authored: Tue Jan 17 19:10:33 2017 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Wed Jan 18 12:11:56 2017 +0100 ---------------------------------------------------------------------- .../test/resources/flink_11-kryo_registrations | 86 ++++++++++++++++ .../runtime/KryoGenericTypeSerializerTest.scala | 100 ++++++++++++++++++- pom.xml | 1 + 3 files changed, 183 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/55483b71/flink-tests/src/test/resources/flink_11-kryo_registrations ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/flink_11-kryo_registrations b/flink-tests/src/test/resources/flink_11-kryo_registrations new file mode 100644 index 0000000..7000e62 --- /dev/null +++ b/flink-tests/src/test/resources/flink_11-kryo_registrations @@ -0,0 +1,86 @@ +0,int +1,java.lang.String +2,float +3,boolean +4,byte +5,char +6,short +7,long +8,double +9,void +10,scala.collection.convert.Wrappers$SeqWrapper +11,scala.collection.convert.Wrappers$IteratorWrapper +12,scala.collection.convert.Wrappers$MapWrapper +13,scala.collection.convert.Wrappers$JListWrapper +14,scala.collection.convert.Wrappers$JMapWrapper +15,scala.Some +16,scala.util.Left +17,scala.util.Right +18,scala.collection.immutable.Vector +19,scala.collection.immutable.Set$Set1 +20,scala.collection.immutable.Set$Set2 +21,scala.collection.immutable.Set$Set3 +22,scala.collection.immutable.Set$Set4 +23,scala.collection.immutable.HashSet$HashTrieSet +24,scala.collection.immutable.Map$Map1 +25,scala.collection.immutable.Map$Map2 +26,scala.collection.immutable.Map$Map3 +27,scala.collection.immutable.Map$Map4 +28,scala.collection.immutable.HashMap$HashTrieMap +29,scala.collection.immutable.Range$Inclusive +30,scala.collection.immutable.NumericRange$Inclusive +31,scala.collection.immutable.NumericRange$Exclusive +32,scala.collection.mutable.BitSet +33,scala.collection.mutable.HashMap +34,scala.collection.mutable.HashSet +35,scala.collection.convert.Wrappers$IterableWrapper +36,scala.Tuple1 +37,scala.Tuple2 +38,scala.Tuple3 +39,scala.Tuple4 +40,scala.Tuple5 +41,scala.Tuple6 +42,scala.Tuple7 +43,scala.Tuple8 +44,scala.Tuple9 +45,scala.Tuple10 +46,scala.Tuple11 +47,scala.Tuple12 +48,scala.Tuple13 +49,scala.Tuple14 +50,scala.Tuple15 +51,scala.Tuple16 +52,scala.Tuple17 +53,scala.Tuple18 +54,scala.Tuple19 +55,scala.Tuple20 +56,scala.Tuple21 +57,scala.Tuple22 +58,scala.Tuple1$mcJ$sp +59,scala.Tuple1$mcI$sp +60,scala.Tuple1$mcD$sp +61,scala.Tuple2$mcJJ$sp +62,scala.Tuple2$mcJI$sp +63,scala.Tuple2$mcJD$sp +64,scala.Tuple2$mcIJ$sp +65,scala.Tuple2$mcII$sp +66,scala.Tuple2$mcID$sp +67,scala.Tuple2$mcDJ$sp +68,scala.Tuple2$mcDI$sp +69,scala.Tuple2$mcDD$sp +70,scala.Symbol +71,scala.reflect.ClassTag +72,scala.runtime.BoxedUnit +73,java.util.Arrays$ArrayList +74,java.util.BitSet +75,java.util.PriorityQueue +76,java.util.regex.Pattern +77,java.sql.Date +78,java.sql.Time +79,java.sql.Timestamp +80,java.net.URI +81,java.net.InetSocketAddress +82,java.util.UUID +83,java.util.Locale +84,java.text.SimpleDateFormat +85,org.apache.avro.generic.GenericData$Array http://git-wip-us.apache.org/repos/asf/flink/blob/55483b71/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala index 08a0a96..e001799 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala @@ -17,17 +17,19 @@ */ package org.apache.flink.api.scala.runtime -import com.esotericsoftware.kryo.{Kryo, Serializer} -import com.esotericsoftware.kryo.io.{Input, Output} +import java.io._ +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Kryo, Serializer} import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils.SerializerTestInstance import org.apache.flink.api.java.typeutils.GenericTypeInfo - +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.joda.time.LocalDate - import org.junit.Test +import scala.collection.mutable +import scala.io.Source import scala.reflect._ class KryoGenericTypeSerializerTest { @@ -146,6 +148,96 @@ class KryoGenericTypeSerializerTest { runTests(list) } + /** + * Tests that the registered classes in Kryo did not change. + * + * Once we have proper serializer versioning this test will become obsolete. + * But currently a change in the serializers can break savepoint backwards + * compatability between Flink versions. + */ + @Test + def testDefaultKryoRegisteredClassesDidNotChange(): Unit = { + // Previous registration (id => registered class (Class#getName)) + val previousRegistrations: mutable.HashMap[Int, String] = mutable.HashMap[Int, String]() + + val stream = Thread.currentThread().getContextClassLoader() + .getResourceAsStream("flink_11-kryo_registrations") + Source.fromInputStream(stream).getLines().foreach{ + line => + val Array(id, registeredClass) = line.split(",") + previousRegistrations.put(id.toInt, registeredClass) + } + + // Get Kryo and verify that the registered IDs and types in + // Kryo have not changed compared to the provided registrations + // file. + val kryo = new KryoSerializer[Integer](classOf[Integer], new ExecutionConfig()).getKryo + val nextId = kryo.getNextRegistrationId + for (i <- 0 until nextId) { + val registration = kryo.getRegistration(i) + + previousRegistrations.get(registration.getId) match { + case None => throw new IllegalStateException(s"Expected no entry with ID " + + s"${registration.getId}, but got one for type ${registration.getType.getName}. This " + + s"can lead to registered user types being deserialized with the wrong serializer when " + + s"restoring a savepoint.") + case Some(registeredClass) => + if (registeredClass != registration.getType.getName) { + throw new IllegalStateException(s"Expected type ${registration.getType.getName} with " + + s"ID ${registration.getId}, but got $registeredClass.") + } + } + } + + // Verify number of registrations (required to check if current number of + // registrations is less than before). + if (previousRegistrations.size != nextId) { + throw new IllegalStateException(s"Number of registered classes changed (previously " + + s"${previousRegistrations.size}, but now $nextId). This can lead to registered user " + + s"types being deserialized with the wrong serializer when restoring a savepoint.") + } + } + + /** + * Creates a Kryo serializer and writes the default registrations out to a + * comma separated file with one entry per line: + * + * id,class + * + * The produced file is used to check that the registered IDs don't change + * in future Flink versions. + * + * This method is not used in the tests, but documents how the test file + * has been created and can be used to re-create it if needed. + * + * @param filePath File path to write registrations to + */ + private def writeDefaultKryoRegistrations(filePath: String) = { + val file = new File(filePath) + if (file.exists()) { + file.delete() + } + + val writer = new BufferedWriter(new FileWriter(file)) + + try { + val kryo = new KryoSerializer[Integer](classOf[Integer], new ExecutionConfig()).getKryo + + val nextId = kryo.getNextRegistrationId + for (i <- 0 until nextId) { + val registration = kryo.getRegistration(i) + val str = registration.getId + "," + registration.getType.getName + writer.write(str, 0, str.length) + writer.newLine() + } + + println(s"Created file with registrations at $file.") + } finally { + writer.close() + } + } + + case class ComplexType(id: String, number: Int, values: List[Int]){ override def equals(obj: Any): Boolean ={ if(obj != null && obj.isInstanceOf[ComplexType]){ http://git-wip-us.apache.org/repos/asf/flink/blob/55483b71/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index cdb4e2d..0493452 100644 --- a/pom.xml +++ b/pom.xml @@ -874,6 +874,7 @@ under the License. <!-- Test Data. --> <exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude> + <exclude>flink-tests/src/test/resources/flink_11-kryo_registrations</exclude> <exclude>flink-connectors/flink-avro/src/test/resources/avro/*.avsc</exclude> <exclude>out/test/flink-avro/avro/user.avsc</exclude> <exclude>flink-libraries/flink-table/src/test/scala/resources/*.out</exclude>
