[ https://issues.apache.org/jira/browse/FLINK-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371867#comment-16371867 ]
ASF GitHub Bot commented on FLINK-8735: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5552#discussion_r169741385 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java --- @@ -0,0 +1,658 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + +/** + * Migration ITCases for a stateful job. The tests are parameterized to cover + * migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection<Tuple2<MigrationVersion, String>> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + /** + * TODO to generate savepoints for a specific Flink version / backend type, + * TODO change these values accordingly, e.g. to generate for 1.4 with RocksDB, + * TODO set as (MigrationVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME) + */ + private final MigrationVersion flinkGenerateSavepointVersion = MigrationVersion.v1_4; + private final String flinkGenerateSavepointBackendType = StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME; + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobSavepointMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + /** + * Manually run this to write binary snapshot data. + */ + @Test + @Ignore + public void writeSavepoint() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (flinkGenerateSavepointBackendType) { + case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())); + break; + case StateBackendLoader.MEMORY_STATE_BACKEND_NAME: + env.setStateBackend(new MemoryStateBackend()); + break; + default: + throw new UnsupportedOperationException(); + } + + env.enableCheckpointing(500); + env.setParallelism(4); + env.setMaxParallelism(4); + + env + .addSource(new CheckpointedNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS)).uid("CheckpointedSource1") + .keyBy(0) + .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap1") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new TimelyStatefulOperator()).uid("TimelyStatefulOperator1") + .addSink(new AccumulatorCountingSink<>()); + + env + .addSource(new CheckpointedParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS)).uid("CheckpointedSource2") + .keyBy(0) + .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap2") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new TimelyStatefulOperator()).uid("TimelyStatefulOperator2") + .addSink(new AccumulatorCountingSink<>()); + + executeAndSavepoint( + env, + "src/test/resources/" + getSavepointPath(flinkGenerateSavepointVersion, flinkGenerateSavepointBackendType), + new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2)); + } + + @Test + public void testSavepointRestore() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (testStateBackend) { + case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())); + break; + case StateBackendLoader.MEMORY_STATE_BACKEND_NAME: + env.setStateBackend(new MemoryStateBackend()); + break; + default: + throw new UnsupportedOperationException(); + } + + env.enableCheckpointing(500); + env.setParallelism(parallelism); + env.setMaxParallelism(parallelism); + + env + .addSource(new CheckingRestoringNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS)).uid("CheckpointedSource1") + .keyBy(0) + .flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap1") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator1") + .addSink(new AccumulatorCountingSink<>()); + + env + .addSource(new CheckingRestoringParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS)).uid("CheckpointedSource2") + .keyBy(0) + .flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap2") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator2") + .addSink(new AccumulatorCountingSink<>()); + + restoreAndExecute( + env, + getResourceFilename(getSavepointPath(testMigrateVersion, testStateBackend)), + new Tuple2<>(CheckingRestoringNonParallelSourceWithListState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1), + new Tuple2<>(CheckingRestoringParallelSourceWithUnionListState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, parallelism), + new Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2), + new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2)); + } + + private String getSavepointPath(MigrationVersion savepointVersion, String backendType) { + switch (backendType) { + case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: + return "new-stateful-udf-migration-itcase-flink" + savepointVersion + "-rocksdb-savepoint"; + case StateBackendLoader.MEMORY_STATE_BACKEND_NAME: + return "new-stateful-udf-migration-itcase-flink" + savepointVersion + "-savepoint"; + default: + throw new UnsupportedOperationException(); + } + } + + private static class CheckpointedNonParallelSourceWithListState + implements SourceFunction<Tuple2<Long, Long>>, CheckpointedFunction { + + final static ListStateDescriptor<String> stateDescriptor = + new ListStateDescriptor<>("source-state", StringSerializer.INSTANCE); + + final static String checkpointedString = "Here be dragons!"; + final static String checkpointedString1 = "Here be more dragons!"; + final static String checkpointedString2 = "Here be yet more dragons!"; + final static String checkpointedString3 = "Here be the mostest dragons!"; + + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + + private final int numElements; + + private transient ListState<String> unionListState; + + public CheckpointedNonParallelSourceWithListState(int numElements) { + this.numElements = numElements; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + unionListState.clear(); + unionListState.add(checkpointedString); + unionListState.add(checkpointedString1); + unionListState.add(checkpointedString2); + unionListState.add(checkpointedString3); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + unionListState = context.getOperatorStateStore().getListState( + stateDescriptor); + } + + @Override + public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { + + ctx.emitWatermark(new Watermark(0)); + + synchronized (ctx.getCheckpointLock()) { + for (long i = 0; i < numElements; i++) { + ctx.collect(new Tuple2<>(i, i)); + } + } + + // don't emit a final watermark so that we don't trigger the registered event-time + // timers + while (isRunning) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + private static class CheckingRestoringNonParallelSourceWithListState + extends RichSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction { + + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringNonParallelSourceWithListState.class + "_RESTORE_CHECK"; + + private volatile boolean isRunning = true; + + private final int numElements; + + public CheckingRestoringNonParallelSourceWithListState(int numElements) { + this.numElements = numElements; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + ListState<String> unionListState = context.getOperatorStateStore().getListState( + CheckpointedNonParallelSourceWithListState.stateDescriptor); + + if (context.isRestored()) { + assertThat(unionListState.get(), + containsInAnyOrder( + CheckpointedNonParallelSourceWithListState.checkpointedString, + CheckpointedNonParallelSourceWithListState.checkpointedString1, + CheckpointedNonParallelSourceWithListState.checkpointedString2, + CheckpointedNonParallelSourceWithListState.checkpointedString3)); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + } else { + throw new RuntimeException( + "This source should always be restored because it's only used when restoring from a savepoint."); + } + } + + @Override + public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { + + // immediately trigger any set timers + ctx.emitWatermark(new Watermark(1000)); + + synchronized (ctx.getCheckpointLock()) { + for (long i = 0; i < numElements; i++) { + ctx.collect(new Tuple2<>(i, i)); + } + } + + while (isRunning) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + private static class CheckpointedParallelSourceWithUnionListState + extends RichSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction { + + final static ListStateDescriptor<String> stateDescriptor = + new ListStateDescriptor<>("source-state", StringSerializer.INSTANCE); + + final static String[] checkpointedStrings = { + "Here be dragons!", + "Here be more dragons!", + "Here be yet more dragons!", + "Here be the mostest dragons!" }; + + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + + private final int numElements; + + private transient ListState<String> unionListState; + + public CheckpointedParallelSourceWithUnionListState(int numElements) { + this.numElements = numElements; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + unionListState.clear(); + + for (String s : checkpointedStrings) { + if (s.hashCode() % getRuntimeContext().getNumberOfParallelSubtasks() == getRuntimeContext().getIndexOfThisSubtask()) { + unionListState.add(s); + } + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + unionListState = context.getOperatorStateStore().getUnionListState( + stateDescriptor); + } + + @Override + public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { + + ctx.emitWatermark(new Watermark(0)); + + synchronized (ctx.getCheckpointLock()) { + for (long i = 0; i < numElements; i++) { + if (i % getRuntimeContext().getNumberOfParallelSubtasks() == getRuntimeContext().getIndexOfThisSubtask()) { + ctx.collect(new Tuple2<>(i, i)); + } + } + } + + // don't emit a final watermark so that we don't trigger the registered event-time + // timers + while (isRunning) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + private static class CheckingRestoringParallelSourceWithUnionListState + extends RichParallelSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction { + + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringParallelSourceWithUnionListState.class + "_RESTORE_CHECK"; + + private volatile boolean isRunning = true; + + private final int numElements; + + public CheckingRestoringParallelSourceWithUnionListState(int numElements) { + this.numElements = numElements; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + ListState<String> unionListState = context.getOperatorStateStore().getUnionListState( + CheckpointedNonParallelSourceWithListState.stateDescriptor); + + if (context.isRestored()) { + assertThat(unionListState.get(), + containsInAnyOrder(CheckpointedParallelSourceWithUnionListState.checkpointedStrings)); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + } else { + throw new RuntimeException( + "This source should always be restored because it's only used when restoring from a savepoint."); + } + } + + @Override + public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { + + // immediately trigger any set timers + ctx.emitWatermark(new Watermark(1000)); + + synchronized (ctx.getCheckpointLock()) { + for (long i = 0; i < numElements; i++) { + if (i % getRuntimeContext().getNumberOfParallelSubtasks() == getRuntimeContext().getIndexOfThisSubtask()) { + ctx.collect(new Tuple2<>(i, i)); + } + } + } + + while (isRunning) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + private static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { --- End diff -- the class naming scheme is inconsistent. The checkers have "Checking" as a prefix, while the setters have "Setting" as an infix. > Add savepoint migration ITCase that covers operator state > --------------------------------------------------------- > > Key: FLINK-8735 > URL: https://issues.apache.org/jira/browse/FLINK-8735 > Project: Flink > Issue Type: Bug > Components: Tests > Affects Versions: 1.4.0, 1.5.0 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Priority: Blocker > Fix For: 1.5.0, 1.4.2 > > > The current {{StatefulJobSavepointMigrationITCase}} does not cover operator > state, meaning state accessed using {{OperatorStateStore}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)