[FLINK-6883] [tests] Add migration tests for Scala jobs This commit adds migration ITCases for jobs written using the Scala API. An extra concern for migration of Scala jobs is that Scala case classes and collections use anonymous generated serializers, which may affect state restore.
This closes #4103. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ae09955 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ae09955 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ae09955 Branch: refs/heads/release-1.3 Commit: 6ae09955000783109c526005aab7e8a85f27dddc Parents: 39c8270 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Sun Jun 11 15:31:42 2017 +0200 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue Jun 13 07:21:00 2017 +0200 ---------------------------------------------------------------------- .../api/scala/typeutils/OptionTypeInfo.scala | 5 +- ...gration-itcase-flink1.2-jobmanager-savepoint | Bin 0 -> 92818 bytes ...-migration-itcase-flink1.2-rocksdb-savepoint | Bin 0 -> 92818 bytes .../_metadata | Bin 0 -> 213855 bytes .../_metadata | Bin 0 -> 213855 bytes ...gration-itcase-flink1.2-jobmanager-savepoint | Bin 0 -> 92818 bytes ...-migration-itcase-flink1.2-rocksdb-savepoint | Bin 0 -> 92818 bytes .../_metadata | Bin 0 -> 213855 bytes .../_metadata | Bin 0 -> 213855 bytes .../scala/migration/MigrationTestTypes.scala | 28 ++ .../ScalaSerializersMigrationTest.scala | 110 +++++++ .../StatefulJobSavepointMigrationITCase.scala | 303 +++++++++++++++++++ 12 files changed, 445 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala index 73fe580..d2e66a5 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala @@ -17,7 +17,7 @@ */ package org.apache.flink.api.scala.typeutils -import org.apache.flink.annotation.{Public, PublicEvolving} +import org.apache.flink.annotation.{Public, PublicEvolving, VisibleForTesting} import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} @@ -85,4 +85,7 @@ class OptionTypeInfo[A, T <: Option[A]](private val elemTypeInfo: TypeInformatio override def hashCode: Int = { elemTypeInfo.hashCode() } + + @VisibleForTesting + def getElemTypeInfo: TypeInformation[A] = elemTypeInfo } http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint new file mode 100644 index 0000000..3d0f8c5 Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint differ http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint new file mode 100644 index 0000000..5a763df Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint differ http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata new file mode 100644 index 0000000..e183e51 Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata differ http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata new file mode 100644 index 0000000..612bc1b Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata differ http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint new file mode 100644 index 0000000..9b90ac8 Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint differ http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint new file mode 100644 index 0000000..99777a1 Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint differ http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata new file mode 100644 index 0000000..6adf433 Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata differ http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata new file mode 100644 index 0000000..d9eaa72 Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata differ http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala new file mode 100644 index 0000000..4ae57c4 --- /dev/null +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.migration + +case class CustomCaseClass(a: String, b: Long) + +case class CustomCaseClassWithNesting(a: Long, nested: CustomCaseClass) + +object CustomEnum extends Enumeration { + type CustomEnum = Value + val ONE, TWO, THREE, FOUR = Value +} http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/ScalaSerializersMigrationTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/ScalaSerializersMigrationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/ScalaSerializersMigrationTest.scala new file mode 100644 index 0000000..a2edc90 --- /dev/null +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/ScalaSerializersMigrationTest.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.migration + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.typeutils._ +import org.junit.{Assert, Test} + +import scala.util.Try + +class ScalaSerializersMigrationTest { + + /** + * Verifies that the generated classnames for anonymous Scala serializers remain the same. + * + * The classnames in this test are collected from running the same type information generation + * code in previous version branches. They should not change across different Flink versions. + */ + @Test + def testStableAnonymousClassnameGeneration(): Unit = { + val caseClassInfo = createTypeInformation[CustomCaseClass] + val caseClassWithNestingInfo = + createTypeInformation[CustomCaseClassWithNesting] + .asInstanceOf[CaseClassTypeInfo[_]] + val traversableInfo = + createTypeInformation[List[CustomCaseClass]] + .asInstanceOf[TraversableTypeInfo[_,_]] + val tryInfo = + createTypeInformation[Try[CustomCaseClass]] + .asInstanceOf[TryTypeInfo[_,_]] + val optionInfo = + createTypeInformation[Option[CustomCaseClass]] + .asInstanceOf[OptionTypeInfo[_,_]] + val eitherInfo = + createTypeInformation[Either[CustomCaseClass, String]] + .asInstanceOf[EitherTypeInfo[_,_,_]] + + Assert.assertEquals( + "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$8", + caseClassInfo.getClass.getName) + Assert.assertEquals( + "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$8$$anon$1", + caseClassInfo.createSerializer(new ExecutionConfig).getClass.getName) + + Assert.assertEquals( + "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$9", + caseClassWithNestingInfo.getClass.getName) + Assert.assertEquals( + "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$9$$anon$3", + caseClassWithNestingInfo.createSerializer(new ExecutionConfig).getClass.getName) + Assert.assertEquals( + "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$9$$anon$10", + caseClassWithNestingInfo.getTypeAt("nested").getClass.getName) + Assert.assertEquals( + "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$9$$anon$10$$anon$2", + caseClassWithNestingInfo.getTypeAt("nested") + .createSerializer(new ExecutionConfig).getClass.getName) + + Assert.assertEquals( + "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$16", + traversableInfo.getClass.getName) + Assert.assertEquals( + "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$16$$anon$12", + traversableInfo.createSerializer(new ExecutionConfig).getClass.getName) + Assert.assertEquals( + "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$11", + traversableInfo.elementTypeInfo.getClass.getName) + Assert.assertEquals( + "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$11$$anon$4", + traversableInfo.elementTypeInfo.createSerializer(new ExecutionConfig).getClass.getName) + + Assert.assertEquals( + "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$13", + tryInfo.elemTypeInfo.getClass.getName) + Assert.assertEquals( + "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$13$$anon$5", + tryInfo.elemTypeInfo.createSerializer(new ExecutionConfig).getClass.getName) + + Assert.assertEquals( + "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$14", + optionInfo.getElemTypeInfo.getClass.getName) + Assert.assertEquals( + "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$14$$anon$6", + optionInfo.getElemTypeInfo.createSerializer(new ExecutionConfig).getClass.getName) + + Assert.assertEquals( + "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$15", + eitherInfo.leftTypeInfo.getClass.getName) + Assert.assertEquals( + "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$15$$anon$7", + eitherInfo.leftTypeInfo.createSerializer(new ExecutionConfig).getClass.getName) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6ae09955/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala new file mode 100644 index 0000000..1e67042 --- /dev/null +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.migration + +import java.util + +import org.apache.flink.api.common.accumulators.IntCounter +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor} +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.memory.MemoryStateBackend +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase +import org.apache.flink.util.Collector +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.runtime.state.{AbstractStateBackend, FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend +import org.apache.flink.streaming.util.migration.MigrationVersion +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{Assume, Ignore, Test} + +import scala.util.{Failure, Properties, Try} + +object StatefulJobSavepointMigrationITCase { + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + def parameters: util.Collection[(MigrationVersion, String)] = { + util.Arrays.asList( + (MigrationVersion.v1_2, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME), + (MigrationVersion.v1_2, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME), + (MigrationVersion.v1_3, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME), + (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME)) + } + + // TODO to generate savepoints for a specific Flink version / backend type, + // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB, + // TODO set as (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME) + val GENERATE_SAVEPOINT_VER: MigrationVersion = MigrationVersion.v1_3 + val GENERATE_SAVEPOINT_BACKEND_TYPE: String = AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME + + val SCALA_VERSION: String = { + val versionString = Properties.versionString.split(" ")(1) + versionString.substring(0, versionString.lastIndexOf(".")) + } + + val NUM_ELEMENTS = 4 +} + +/** + * ITCase for migration Scala state types across different Flink versions. + */ +@RunWith(classOf[Parameterized]) +class StatefulJobSavepointMigrationITCase( + migrationVersionAndBackend: (MigrationVersion, String)) + extends SavepointMigrationTestBase with Serializable { + + @Ignore + @Test + def testCreateSavepoint(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + + StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match { + case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME => + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())) + case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME => + env.setStateBackend(new MemoryStateBackend()) + case _ => throw new UnsupportedOperationException + } + + env.setStateBackend(new MemoryStateBackend) + env.enableCheckpointing(500) + env.setParallelism(4) + env.setMaxParallelism(4) + + env + .addSource( + new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource") + .keyBy( + new KeySelector[(Long, Long), Long] { + override def getKey(value: (Long, Long)): Long = value._1 + } + ) + .flatMap(new StatefulFlatMapper) + .addSink(new AccumulatorCountingSink) + + executeAndSavepoint( + env, + s"src/test/resources/stateful-scala" + + s"${StatefulJobSavepointMigrationITCase.SCALA_VERSION}" + + s"-udf-migration-itcase-flink" + + s"${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_VER}" + + s"-${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE}-savepoint", + new Tuple2( + AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, + StatefulJobSavepointMigrationITCase.NUM_ELEMENTS + ) + ) + } + + @Test + def testRestoreSavepoint(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + + migrationVersionAndBackend._2 match { + case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME => + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())) + case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME => + env.setStateBackend(new MemoryStateBackend()) + case _ => throw new UnsupportedOperationException + } + + env.setStateBackend(new MemoryStateBackend) + env.enableCheckpointing(500) + env.setParallelism(4) + env.setMaxParallelism(4) + + env + .addSource( + new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource") + .keyBy( + new KeySelector[(Long, Long), Long] { + override def getKey(value: (Long, Long)): Long = value._1 + } + ) + .flatMap(new StatefulFlatMapper) + .addSink(new AccumulatorCountingSink) + + restoreAndExecute( + env, + SavepointMigrationTestBase.getResourceFilename( + s"stateful-scala${StatefulJobSavepointMigrationITCase.SCALA_VERSION}" + + s"-udf-migration-itcase-flink${migrationVersionAndBackend._1}" + + s"-${migrationVersionAndBackend._2}-savepoint"), + new Tuple2( + AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, + StatefulJobSavepointMigrationITCase.NUM_ELEMENTS) + ) + } + + @SerialVersionUID(1L) + private object CheckpointedSource { + var CHECKPOINTED_STRING = "Here be dragons!" + } + + @SerialVersionUID(1L) + private class CheckpointedSource(val numElements: Int) + extends SourceFunction[(Long, Long)] with CheckpointedFunction { + + private var isRunning = true + private var state: ListState[CustomCaseClass] = _ + + @throws[Exception] + override def run(ctx: SourceFunction.SourceContext[(Long, Long)]) { + ctx.emitWatermark(new Watermark(0)) + ctx.getCheckpointLock synchronized { + var i = 0 + while (i < numElements) { + ctx.collect(i, i) + i += 1 + } + } + // don't emit a final watermark so that we don't trigger the registered event-time + // timers + while (isRunning) Thread.sleep(20) + } + + def cancel() { + isRunning = false + } + + override def initializeState(context: FunctionInitializationContext): Unit = { + state = context.getOperatorStateStore.getOperatorState( + new ListStateDescriptor[CustomCaseClass]( + "sourceState", createTypeInformation[CustomCaseClass])) + } + + override def snapshotState(context: FunctionSnapshotContext): Unit = { + state.clear() + state.add(CustomCaseClass("Here be dragons!", 123)) + } + } + + @SerialVersionUID(1L) + private object AccumulatorCountingSink { + var NUM_ELEMENTS_ACCUMULATOR = classOf[AccumulatorCountingSink[_]] + "_NUM_ELEMENTS" + } + + @SerialVersionUID(1L) + private class AccumulatorCountingSink[T] extends RichSinkFunction[T] { + + private var count: Int = 0 + + @throws[Exception] + override def open(parameters: Configuration) { + super.open(parameters) + getRuntimeContext.addAccumulator( + AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, new IntCounter) + } + + @throws[Exception] + def invoke(value: T) { + count += 1 + getRuntimeContext.getAccumulator( + AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR).add(1) + } + } + + class StatefulFlatMapper extends RichFlatMapFunction[(Long, Long), (Long, Long)] { + + private var caseClassState: ValueState[CustomCaseClass] = _ + private var caseClassWithNestingState: ValueState[CustomCaseClassWithNesting] = _ + private var collectionState: ValueState[List[CustomCaseClass]] = _ + private var tryState: ValueState[Try[CustomCaseClass]] = _ + private var tryFailureState: ValueState[Try[CustomCaseClass]] = _ + private var optionState: ValueState[Option[CustomCaseClass]] = _ + private var optionNoneState: ValueState[Option[CustomCaseClass]] = _ + private var eitherLeftState: ValueState[Either[CustomCaseClass, String]] = _ + private var eitherRightState: ValueState[Either[CustomCaseClass, String]] = _ + private var enumOneState: ValueState[CustomEnum] = _ + private var enumThreeState: ValueState[CustomEnum] = _ + + override def open(parameters: Configuration): Unit = { + caseClassState = getRuntimeContext.getState( + new ValueStateDescriptor[CustomCaseClass]( + "caseClassState", createTypeInformation[CustomCaseClass])) + caseClassWithNestingState = getRuntimeContext.getState( + new ValueStateDescriptor[CustomCaseClassWithNesting]( + "caseClassWithNestingState", createTypeInformation[CustomCaseClassWithNesting])) + collectionState = getRuntimeContext.getState( + new ValueStateDescriptor[List[CustomCaseClass]]( + "collectionState", createTypeInformation[List[CustomCaseClass]])) + tryState = getRuntimeContext.getState( + new ValueStateDescriptor[Try[CustomCaseClass]]( + "tryState", createTypeInformation[Try[CustomCaseClass]])) + tryFailureState = getRuntimeContext.getState( + new ValueStateDescriptor[Try[CustomCaseClass]]( + "tryFailureState", createTypeInformation[Try[CustomCaseClass]])) + optionState = getRuntimeContext.getState( + new ValueStateDescriptor[Option[CustomCaseClass]]( + "optionState", createTypeInformation[Option[CustomCaseClass]])) + optionNoneState = getRuntimeContext.getState( + new ValueStateDescriptor[Option[CustomCaseClass]]( + "optionNoneState", createTypeInformation[Option[CustomCaseClass]])) + eitherLeftState = getRuntimeContext.getState( + new ValueStateDescriptor[Either[CustomCaseClass, String]]( + "eitherLeftState", createTypeInformation[Either[CustomCaseClass, String]])) + eitherRightState = getRuntimeContext.getState( + new ValueStateDescriptor[Either[CustomCaseClass, String]]( + "eitherRightState", createTypeInformation[Either[CustomCaseClass, String]])) + enumOneState = getRuntimeContext.getState( + new ValueStateDescriptor[CustomEnum]( + "enumOneState", createTypeInformation[CustomEnum])) + enumThreeState = getRuntimeContext.getState( + new ValueStateDescriptor[CustomEnum]( + "enumThreeState", createTypeInformation[CustomEnum])) + } + + override def flatMap(in: (Long, Long), collector: Collector[(Long, Long)]): Unit = { + caseClassState.update(CustomCaseClass(in._1.toString, in._2 * 2)) + caseClassWithNestingState.update( + CustomCaseClassWithNesting(in._1, CustomCaseClass(in._1.toString, in._2 * 2))) + collectionState.update(List(CustomCaseClass(in._1.toString, in._2 * 2))) + tryState.update(Try(CustomCaseClass(in._1.toString, in._2 * 5))) + tryFailureState.update(Failure(new RuntimeException)) + optionState.update(Some(CustomCaseClass(in._1.toString, in._2 * 2))) + optionNoneState.update(None) + eitherLeftState.update(Left(CustomCaseClass(in._1.toString, in._2 * 2))) + eitherRightState.update(Right((in._1 * 3).toString)) + enumOneState.update(CustomEnum.ONE) + enumOneState.update(CustomEnum.THREE) + + collector.collect(in) + } + } + +}
