[
https://issues.apache.org/jira/browse/FLINK-6883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16046544#comment-16046544
]
ASF GitHub Bot commented on FLINK-6883:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/4103#discussion_r121388886
--- Diff:
flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointITCase.scala
---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.migration
+
+import java.util
+
+import org.apache.flink.api.common.accumulators.IntCounter
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.state.{ListState, ListStateDescriptor,
ValueState, ValueStateDescriptor}
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.memory.MemoryStateBackend
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase
+import org.apache.flink.util.Collector
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.runtime.state.{AbstractStateBackend,
FunctionInitializationContext, FunctionSnapshotContext}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
+import org.apache.flink.streaming.util.migration.MigrationVersion
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Ignore, Test}
+
+import scala.util.{Failure, Try}
+
+object StatefulJobSavepointITCase {
+
+ @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+ def parameters: util.Collection[(MigrationVersion, String)] = {
+ util.Arrays.asList(
+ (MigrationVersion.v1_2,
AbstractStateBackend.MEMORY_STATE_BACKEND_NAME),
+ (MigrationVersion.v1_2,
AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME),
+ (MigrationVersion.v1_3,
AbstractStateBackend.MEMORY_STATE_BACKEND_NAME),
+ (MigrationVersion.v1_3,
AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME))
+ }
+
+ // TODO to generate savepoints for a specific Flink version / backend
type,
+ // TODO change these values accordingly, e.g. to generate for 1.3 with
RocksDB,
+ // TODO set as (MigrationVersion.v1_3,
AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME)
+ val GENERATE_SAVEPOINT_VER: MigrationVersion = null
+ val GENERATE_SAVEPOINT_BACKEND_TYPE: String = ""
+
+ val NUM_ELEMENTS = 4
+}
+
+/**
+ * ITCase for migration Scala state types across different Flink versions.
+ */
+@RunWith(classOf[Parameterized])
+class StatefulJobSavepointITCase(
+ migrationVersionAndBackend: (MigrationVersion, String))
+ extends SavepointMigrationTestBase with Serializable {
+
+ @Ignore
+ @Test
+ def testCreateSavepoint(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+ StatefulJobSavepointITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match {
+ case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME =>
+ env.setStateBackend(new RocksDBStateBackend(new
MemoryStateBackend()))
+ case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME =>
+ env.setStateBackend(new MemoryStateBackend())
+ case _ => throw new UnsupportedOperationException
+ }
+
+ env.setStateBackend(new MemoryStateBackend)
+ env.enableCheckpointing(500)
+ env.setParallelism(4)
+ env.setMaxParallelism(4)
+
+ env
+ .addSource(
+ new
CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
+ .keyBy(
+ new KeySelector[(Long, Long), Long] {
+ override def getKey(value: (Long, Long)): Long = value._1
+ }
+ )
+ .flatMap(new StatefulFlatMapper)
+ .addSink(new AccumulatorCountingSink)
+
+ executeAndSavepoint(
+ env,
+ "src/test/resources/stateful-scala-udf-migration-itcase-flink"
+ + StatefulJobSavepointITCase.GENERATE_SAVEPOINT_VER + "-"
+ + StatefulJobSavepointITCase.GENERATE_SAVEPOINT_BACKEND_TYPE +
"-savepoint",
+ new Tuple2(
+ AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+ StatefulJobSavepointITCase.NUM_ELEMENTS
+ )
+ )
+ }
+
+ @Test
+ def testRestoreSavepoint(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+ migrationVersionAndBackend._2 match {
+ case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME =>
+ env.setStateBackend(new RocksDBStateBackend(new
MemoryStateBackend()))
+ case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME =>
+ env.setStateBackend(new MemoryStateBackend())
+ case _ => throw new UnsupportedOperationException
+ }
+
+ env.setStateBackend(new MemoryStateBackend)
+ env.enableCheckpointing(500)
+ env.setParallelism(4)
+ env.setMaxParallelism(4)
+
+ env
+ .addSource(
+ new
CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
+ .keyBy(
+ new KeySelector[(Long, Long), Long] {
+ override def getKey(value: (Long, Long)): Long = value._1
+ }
+ )
+ .flatMap(new StatefulFlatMapper)
+ .addSink(new AccumulatorCountingSink)
+
+ restoreAndExecute(
+ env,
+ SavepointMigrationTestBase.getResourceFilename(
+ "stateful-scala-udf-migration-itcase-flink"
+ + migrationVersionAndBackend._1 + "-"
+ + migrationVersionAndBackend._2 + "-savepoint"),
+ new Tuple2(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 4)
+ )
+ }
+
+ @SerialVersionUID(1L)
+ private object CheckpointedSource {
+ var CHECKPOINTED_STRING = "Here be dragons!"
+ }
+
+ @SerialVersionUID(1L)
+ private class CheckpointedSource(val numElements: Int)
+ extends SourceFunction[(Long, Long)] with CheckpointedFunction {
+
+ private var isRunning = true
+ private var state: ListState[CustomCaseClass] = _
+
+ @throws[Exception]
+ override def run(ctx: SourceFunction.SourceContext[(Long, Long)]) {
+ ctx.emitWatermark(new Watermark(0))
+ ctx.getCheckpointLock synchronized {
+ var i = 0
+ while (i < numElements) {
+ {
--- End diff --
Are the additional `{}` blocks needed?
> Serializer for collection of Scala case classes are generated with different
> anonymous class names in 1.3
> ---------------------------------------------------------------------------------------------------------
>
> Key: FLINK-6883
> URL: https://issues.apache.org/jira/browse/FLINK-6883
> Project: Flink
> Issue Type: Bug
> Components: Scala API, Type Serialization System
> Affects Versions: 1.3.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Labels: flink-rel-1.3.1-blockers
> Fix For: 1.3.1
>
>
> In the Scala API, serializers are generated using Scala macros (via the
> {{org.apache.flink.streaming.api.scala.createTypeInformation(..)}} util).
> The generated serializers are inner anonymous classes, therefore classnames
> will differ depending on when / order that the serializers are generated.
> From 1.1 / 1.2 to Flink 1.3, the generated classnames for a serializer for a
> collections of case classes (e.g. {{List[SomeUserCaseClass]}}) will be
> different. In other words, the exact same user code written in the Scala API,
> compiling it with 1.1 / 1.2 and with 1.3 will result in different classnames.
> This is problematic for restoring older savepoints that have Scala case class
> collections in their state, because the old serializer cannot be recovered
> (due to the generated classname change).
> For now, I've managed to identify that the root cause for this is that in 1.3
> the {{TypeSerializer}} base class additionally extends the
> {{TypeDeserializer}} interface. Removing this extending resolves the problem.
> The actual reason for why this affects the generated classname is still being
> investigated.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)