[FLINK-6883] [tests] Add migration tests for Scala jobs

This commit adds migration ITCases for jobs written using the Scala API.
An extra concern for migration of Scala jobs is that Scala case classes
and collections use anonymous generated serializers, which may affect
state restore.

This closes #4103.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ae09955
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ae09955
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ae09955

Branch: refs/heads/release-1.3
Commit: 6ae09955000783109c526005aab7e8a85f27dddc
Parents: 39c8270
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Sun Jun 11 15:31:42 2017 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue Jun 13 07:21:00 2017 +0200

----------------------------------------------------------------------
 .../api/scala/typeutils/OptionTypeInfo.scala    |   5 +-
 ...gration-itcase-flink1.2-jobmanager-savepoint | Bin 0 -> 92818 bytes
 ...-migration-itcase-flink1.2-rocksdb-savepoint | Bin 0 -> 92818 bytes
 .../_metadata                                   | Bin 0 -> 213855 bytes
 .../_metadata                                   | Bin 0 -> 213855 bytes
 ...gration-itcase-flink1.2-jobmanager-savepoint | Bin 0 -> 92818 bytes
 ...-migration-itcase-flink1.2-rocksdb-savepoint | Bin 0 -> 92818 bytes
 .../_metadata                                   | Bin 0 -> 213855 bytes
 .../_metadata                                   | Bin 0 -> 213855 bytes
 .../scala/migration/MigrationTestTypes.scala    |  28 ++
 .../ScalaSerializersMigrationTest.scala         | 110 +++++++
 .../StatefulJobSavepointMigrationITCase.scala   | 303 +++++++++++++++++++
 12 files changed, 445 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
index 73fe580..d2e66a5 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
-import org.apache.flink.annotation.{Public, PublicEvolving}
+import org.apache.flink.annotation.{Public, PublicEvolving, VisibleForTesting}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
@@ -85,4 +85,7 @@ class OptionTypeInfo[A, T <: Option[A]](private val 
elemTypeInfo: TypeInformatio
   override def hashCode: Int = {
     elemTypeInfo.hashCode()
   }
+
+  @VisibleForTesting
+  def getElemTypeInfo: TypeInformation[A] = elemTypeInfo
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint
 
b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint
new file mode 100644
index 0000000..3d0f8c5
Binary files /dev/null and 
b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint
 
b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint
new file mode 100644
index 0000000..5a763df
Binary files /dev/null and 
b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
 
b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
new file mode 100644
index 0000000..e183e51
Binary files /dev/null and 
b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
 
b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
new file mode 100644
index 0000000..612bc1b
Binary files /dev/null and 
b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint
 
b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint
new file mode 100644
index 0000000..9b90ac8
Binary files /dev/null and 
b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint
 
b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint
new file mode 100644
index 0000000..99777a1
Binary files /dev/null and 
b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
 
b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
new file mode 100644
index 0000000..6adf433
Binary files /dev/null and 
b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
 
b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
new file mode 100644
index 0000000..d9eaa72
Binary files /dev/null and 
b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala
new file mode 100644
index 0000000..4ae57c4
--- /dev/null
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.migration
+
+case class CustomCaseClass(a: String, b: Long)
+
+case class CustomCaseClassWithNesting(a: Long, nested: CustomCaseClass)
+
+object CustomEnum extends Enumeration {
+  type CustomEnum = Value
+  val ONE, TWO, THREE, FOUR = Value
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/ScalaSerializersMigrationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/ScalaSerializersMigrationTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/ScalaSerializersMigrationTest.scala
new file mode 100644
index 0000000..a2edc90
--- /dev/null
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/ScalaSerializersMigrationTest.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.migration
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils._
+import org.junit.{Assert, Test}
+
+import scala.util.Try
+
+class ScalaSerializersMigrationTest {
+
+  /**
+   * Verifies that the generated classnames for anonymous Scala serializers 
remain the same.
+   *
+   * The classnames in this test are collected from running the same type 
information generation
+   * code in previous version branches. They should not change across 
different Flink versions.
+   */
+  @Test
+  def testStableAnonymousClassnameGeneration(): Unit = {
+    val caseClassInfo = createTypeInformation[CustomCaseClass]
+    val caseClassWithNestingInfo =
+      createTypeInformation[CustomCaseClassWithNesting]
+        .asInstanceOf[CaseClassTypeInfo[_]]
+    val traversableInfo =
+      createTypeInformation[List[CustomCaseClass]]
+        .asInstanceOf[TraversableTypeInfo[_,_]]
+    val tryInfo =
+      createTypeInformation[Try[CustomCaseClass]]
+        .asInstanceOf[TryTypeInfo[_,_]]
+    val optionInfo =
+      createTypeInformation[Option[CustomCaseClass]]
+        .asInstanceOf[OptionTypeInfo[_,_]]
+    val eitherInfo =
+      createTypeInformation[Either[CustomCaseClass, String]]
+        .asInstanceOf[EitherTypeInfo[_,_,_]]
+
+    Assert.assertEquals(
+      
"org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$8",
+      caseClassInfo.getClass.getName)
+    Assert.assertEquals(
+      
"org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$8$$anon$1",
+      caseClassInfo.createSerializer(new ExecutionConfig).getClass.getName)
+
+    Assert.assertEquals(
+      
"org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$9",
+      caseClassWithNestingInfo.getClass.getName)
+    Assert.assertEquals(
+      
"org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$9$$anon$3",
+      caseClassWithNestingInfo.createSerializer(new 
ExecutionConfig).getClass.getName)
+    Assert.assertEquals(
+      
"org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$9$$anon$10",
+      caseClassWithNestingInfo.getTypeAt("nested").getClass.getName)
+    Assert.assertEquals(
+      
"org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$9$$anon$10$$anon$2",
+      caseClassWithNestingInfo.getTypeAt("nested")
+        .createSerializer(new ExecutionConfig).getClass.getName)
+
+    Assert.assertEquals(
+      
"org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$16",
+      traversableInfo.getClass.getName)
+    Assert.assertEquals(
+      
"org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$16$$anon$12",
+      traversableInfo.createSerializer(new ExecutionConfig).getClass.getName)
+    Assert.assertEquals(
+      
"org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$11",
+      traversableInfo.elementTypeInfo.getClass.getName)
+    Assert.assertEquals(
+      
"org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$11$$anon$4",
+      traversableInfo.elementTypeInfo.createSerializer(new 
ExecutionConfig).getClass.getName)
+
+    Assert.assertEquals(
+      
"org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$13",
+      tryInfo.elemTypeInfo.getClass.getName)
+    Assert.assertEquals(
+      
"org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$13$$anon$5",
+      tryInfo.elemTypeInfo.createSerializer(new 
ExecutionConfig).getClass.getName)
+
+    Assert.assertEquals(
+      
"org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$14",
+      optionInfo.getElemTypeInfo.getClass.getName)
+    Assert.assertEquals(
+      
"org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$14$$anon$6",
+      optionInfo.getElemTypeInfo.createSerializer(new 
ExecutionConfig).getClass.getName)
+
+    Assert.assertEquals(
+      
"org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$15",
+      eitherInfo.leftTypeInfo.getClass.getName)
+    Assert.assertEquals(
+      
"org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$15$$anon$7",
+      eitherInfo.leftTypeInfo.createSerializer(new 
ExecutionConfig).getClass.getName)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
new file mode 100644
index 0000000..1e67042
--- /dev/null
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.migration
+
+import java.util
+
+import org.apache.flink.api.common.accumulators.IntCounter
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, 
ValueState, ValueStateDescriptor}
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.memory.MemoryStateBackend
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase
+import org.apache.flink.util.Collector
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.runtime.state.{AbstractStateBackend, 
FunctionInitializationContext, FunctionSnapshotContext}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
+import org.apache.flink.streaming.util.migration.MigrationVersion
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Assume, Ignore, Test}
+
+import scala.util.{Failure, Properties, Try}
+
+object StatefulJobSavepointMigrationITCase {
+
+  @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+  def parameters: util.Collection[(MigrationVersion, String)] = {
+    util.Arrays.asList(
+      (MigrationVersion.v1_2, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME),
+      (MigrationVersion.v1_2, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME),
+      (MigrationVersion.v1_3, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME),
+      (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME))
+  }
+
+  // TODO to generate savepoints for a specific Flink version / backend type,
+  // TODO change these values accordingly, e.g. to generate for 1.3 with 
RocksDB,
+  // TODO set as (MigrationVersion.v1_3, 
AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME)
+  val GENERATE_SAVEPOINT_VER: MigrationVersion = MigrationVersion.v1_3
+  val GENERATE_SAVEPOINT_BACKEND_TYPE: String = 
AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME
+
+  val SCALA_VERSION: String = {
+    val versionString = Properties.versionString.split(" ")(1)
+    versionString.substring(0, versionString.lastIndexOf("."))
+  }
+
+  val NUM_ELEMENTS = 4
+}
+
+/**
+ * ITCase for migration Scala state types across different Flink versions.
+ */
+@RunWith(classOf[Parameterized])
+class StatefulJobSavepointMigrationITCase(
+    migrationVersionAndBackend: (MigrationVersion, String))
+  extends SavepointMigrationTestBase with Serializable {
+
+  @Ignore
+  @Test
+  def testCreateSavepoint(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match {
+      case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME =>
+        env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()))
+      case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME =>
+        env.setStateBackend(new MemoryStateBackend())
+      case _ => throw new UnsupportedOperationException
+    }
+
+    env.setStateBackend(new MemoryStateBackend)
+    env.enableCheckpointing(500)
+    env.setParallelism(4)
+    env.setMaxParallelism(4)
+
+    env
+      .addSource(
+        new 
CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
+      .keyBy(
+        new KeySelector[(Long, Long), Long] {
+          override def getKey(value: (Long, Long)): Long = value._1
+        }
+      )
+      .flatMap(new StatefulFlatMapper)
+      .addSink(new AccumulatorCountingSink)
+
+    executeAndSavepoint(
+      env,
+      s"src/test/resources/stateful-scala" +
+        s"${StatefulJobSavepointMigrationITCase.SCALA_VERSION}" +
+        s"-udf-migration-itcase-flink" +
+        s"${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_VER}" +
+        
s"-${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE}-savepoint",
+      new Tuple2(
+        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+        StatefulJobSavepointMigrationITCase.NUM_ELEMENTS
+      )
+    )
+  }
+
+  @Test
+  def testRestoreSavepoint(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    migrationVersionAndBackend._2 match {
+      case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME =>
+        env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()))
+      case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME =>
+        env.setStateBackend(new MemoryStateBackend())
+      case _ => throw new UnsupportedOperationException
+    }
+
+    env.setStateBackend(new MemoryStateBackend)
+    env.enableCheckpointing(500)
+    env.setParallelism(4)
+    env.setMaxParallelism(4)
+
+    env
+      .addSource(
+        new 
CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
+      .keyBy(
+        new KeySelector[(Long, Long), Long] {
+          override def getKey(value: (Long, Long)): Long = value._1
+        }
+      )
+      .flatMap(new StatefulFlatMapper)
+      .addSink(new AccumulatorCountingSink)
+
+    restoreAndExecute(
+      env,
+      SavepointMigrationTestBase.getResourceFilename(
+        s"stateful-scala${StatefulJobSavepointMigrationITCase.SCALA_VERSION}" +
+          s"-udf-migration-itcase-flink${migrationVersionAndBackend._1}" +
+          s"-${migrationVersionAndBackend._2}-savepoint"),
+      new Tuple2(
+        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+        StatefulJobSavepointMigrationITCase.NUM_ELEMENTS)
+    )
+  }
+
+  @SerialVersionUID(1L)
+  private object CheckpointedSource {
+    var CHECKPOINTED_STRING = "Here be dragons!"
+  }
+
+  @SerialVersionUID(1L)
+  private class CheckpointedSource(val numElements: Int)
+      extends SourceFunction[(Long, Long)] with CheckpointedFunction {
+
+    private var isRunning = true
+    private var state: ListState[CustomCaseClass] = _
+
+    @throws[Exception]
+    override def run(ctx: SourceFunction.SourceContext[(Long, Long)]) {
+      ctx.emitWatermark(new Watermark(0))
+      ctx.getCheckpointLock synchronized {
+        var i = 0
+        while (i < numElements) {
+            ctx.collect(i, i)
+            i += 1
+        }
+      }
+      // don't emit a final watermark so that we don't trigger the registered 
event-time
+      // timers
+      while (isRunning) Thread.sleep(20)
+    }
+
+    def cancel() {
+      isRunning = false
+    }
+
+    override def initializeState(context: FunctionInitializationContext): Unit 
= {
+      state = context.getOperatorStateStore.getOperatorState(
+        new ListStateDescriptor[CustomCaseClass](
+          "sourceState", createTypeInformation[CustomCaseClass]))
+    }
+
+    override def snapshotState(context: FunctionSnapshotContext): Unit = {
+      state.clear()
+      state.add(CustomCaseClass("Here be dragons!", 123))
+    }
+  }
+
+  @SerialVersionUID(1L)
+  private object AccumulatorCountingSink {
+    var NUM_ELEMENTS_ACCUMULATOR = classOf[AccumulatorCountingSink[_]] + 
"_NUM_ELEMENTS"
+  }
+
+  @SerialVersionUID(1L)
+  private class AccumulatorCountingSink[T] extends RichSinkFunction[T] {
+
+    private var count: Int = 0
+
+    @throws[Exception]
+    override def open(parameters: Configuration) {
+      super.open(parameters)
+      getRuntimeContext.addAccumulator(
+        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, new IntCounter)
+    }
+
+    @throws[Exception]
+    def invoke(value: T) {
+      count += 1
+      getRuntimeContext.getAccumulator(
+        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR).add(1)
+    }
+  }
+
+  class StatefulFlatMapper extends RichFlatMapFunction[(Long, Long), (Long, 
Long)] {
+
+    private var caseClassState: ValueState[CustomCaseClass] = _
+    private var caseClassWithNestingState: 
ValueState[CustomCaseClassWithNesting] = _
+    private var collectionState: ValueState[List[CustomCaseClass]] = _
+    private var tryState: ValueState[Try[CustomCaseClass]] = _
+    private var tryFailureState: ValueState[Try[CustomCaseClass]] = _
+    private var optionState: ValueState[Option[CustomCaseClass]] = _
+    private var optionNoneState: ValueState[Option[CustomCaseClass]] = _
+    private var eitherLeftState: ValueState[Either[CustomCaseClass, String]] = 
_
+    private var eitherRightState: ValueState[Either[CustomCaseClass, String]] 
= _
+    private var enumOneState: ValueState[CustomEnum] = _
+    private var enumThreeState: ValueState[CustomEnum] = _
+
+    override def open(parameters: Configuration): Unit = {
+      caseClassState = getRuntimeContext.getState(
+        new ValueStateDescriptor[CustomCaseClass](
+          "caseClassState", createTypeInformation[CustomCaseClass]))
+      caseClassWithNestingState = getRuntimeContext.getState(
+        new ValueStateDescriptor[CustomCaseClassWithNesting](
+          "caseClassWithNestingState", 
createTypeInformation[CustomCaseClassWithNesting]))
+      collectionState = getRuntimeContext.getState(
+        new ValueStateDescriptor[List[CustomCaseClass]](
+          "collectionState", createTypeInformation[List[CustomCaseClass]]))
+      tryState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Try[CustomCaseClass]](
+          "tryState", createTypeInformation[Try[CustomCaseClass]]))
+      tryFailureState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Try[CustomCaseClass]](
+          "tryFailureState", createTypeInformation[Try[CustomCaseClass]]))
+      optionState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Option[CustomCaseClass]](
+          "optionState", createTypeInformation[Option[CustomCaseClass]]))
+      optionNoneState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Option[CustomCaseClass]](
+          "optionNoneState", createTypeInformation[Option[CustomCaseClass]]))
+      eitherLeftState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Either[CustomCaseClass, String]](
+          "eitherLeftState", createTypeInformation[Either[CustomCaseClass, 
String]]))
+      eitherRightState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Either[CustomCaseClass, String]](
+          "eitherRightState", createTypeInformation[Either[CustomCaseClass, 
String]]))
+      enumOneState = getRuntimeContext.getState(
+        new ValueStateDescriptor[CustomEnum](
+          "enumOneState", createTypeInformation[CustomEnum]))
+      enumThreeState = getRuntimeContext.getState(
+        new ValueStateDescriptor[CustomEnum](
+          "enumThreeState", createTypeInformation[CustomEnum]))
+    }
+
+    override def flatMap(in: (Long, Long), collector: Collector[(Long, 
Long)]): Unit = {
+      caseClassState.update(CustomCaseClass(in._1.toString, in._2 * 2))
+      caseClassWithNestingState.update(
+        CustomCaseClassWithNesting(in._1, CustomCaseClass(in._1.toString, 
in._2 * 2)))
+      collectionState.update(List(CustomCaseClass(in._1.toString, in._2 * 2)))
+      tryState.update(Try(CustomCaseClass(in._1.toString, in._2 * 5)))
+      tryFailureState.update(Failure(new RuntimeException))
+      optionState.update(Some(CustomCaseClass(in._1.toString, in._2 * 2)))
+      optionNoneState.update(None)
+      eitherLeftState.update(Left(CustomCaseClass(in._1.toString, in._2 * 2)))
+      eitherRightState.update(Right((in._1 * 3).toString))
+      enumOneState.update(CustomEnum.ONE)
+      enumOneState.update(CustomEnum.THREE)
+
+      collector.collect(in)
+    }
+  }
+
+}

Reply via email to