[
https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15992797#comment-15992797
]
ASF GitHub Bot commented on FLINK-5969:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/3778#discussion_r114294138
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
---
@@ -0,0 +1,771 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * This verifies that we can restore a complete job from a Flink 1.2
savepoint.
+ *
+ * <p>The test pipeline contains both "Checkpointed" state and keyed user
state.
+ *
+ * <p>The tests will time out if they don't see the required number of
successful checks within
+ * a time limit.
+ */
+public class StatefulJobSavepointFrom12MigrationITCase extends
SavepointMigrationTestBase {
+ private static final int NUM_SOURCE_ELEMENTS = 4;
+
+ /**
+ * This has to be manually executed to create the savepoint on Flink
1.2.
+ */
+ @Test
+ @Ignore
+ public void testCreateSavepointOnFlink12() throws Exception {
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ // we only test memory state backend yet
+ env.setStateBackend(new MemoryStateBackend());
+ env.enableCheckpointing(500);
+ env.setParallelism(4);
+ env.setMaxParallelism(4);
+
+ env
+ .addSource(new
LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+ .flatMap(new
LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+ .keyBy(0)
+ .flatMap(new
LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+ .keyBy(0)
+ .flatMap(new
KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+ .keyBy(0)
+ .transform(
+ "custom_operator",
+ new TypeHint<Tuple2<Long,
Long>>() {}.getTypeInfo(),
+ new CheckpointedUdfOperator(new
LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+ .keyBy(0)
+ .transform(
+ "timely_stateful_operator",
+ new TypeHint<Tuple2<Long,
Long>>() {}.getTypeInfo(),
+ new
TimelyStatefulOperator()).uid("TimelyStatefulOperator")
+ .addSink(new
AccumulatorCountingSink<Tuple2<Long, Long>>());
+
+ executeAndSavepoint(
+ env,
+
"src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint",
+ new
Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
NUM_SOURCE_ELEMENTS));
+ }
+
+ /**
+ * This has to be manually executed to create the savepoint on Flink
1.2.
+ */
+ @Test
+ @Ignore
+ public void testCreateSavepointOnFlink12WithRocksDB() throws Exception {
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ RocksDBStateBackend rocksBackend =
+ new RocksDBStateBackend(new
MemoryStateBackend());
+ env.setStateBackend(rocksBackend);
+ env.enableCheckpointing(500);
+ env.setParallelism(4);
+ env.setMaxParallelism(4);
+
+ env
+ .addSource(new
LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+ .flatMap(new
LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+ .keyBy(0)
+ .flatMap(new
LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+ .keyBy(0)
+ .flatMap(new
KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+ .keyBy(0)
+ .transform(
+ "custom_operator",
+ new TypeHint<Tuple2<Long,
Long>>() {}.getTypeInfo(),
+ new CheckpointedUdfOperator(new
LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+ .keyBy(0)
+ .transform(
+ "timely_stateful_operator",
+ new TypeHint<Tuple2<Long,
Long>>() {}.getTypeInfo(),
+ new
TimelyStatefulOperator()).uid("TimelyStatefulOperator")
+ .addSink(new
AccumulatorCountingSink<Tuple2<Long, Long>>());
+
+ executeAndSavepoint(
+ env,
+
"src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb",
+ new
Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
NUM_SOURCE_ELEMENTS));
+ }
+
+
+ @Test
+ public void testSavepointRestoreFromFlink12() throws Exception {
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ // we only test memory state backend yet
+ env.setStateBackend(new MemoryStateBackend());
+ env.enableCheckpointing(500);
+ env.setParallelism(4);
+ env.setMaxParallelism(4);
+
+ env
+ .addSource(new
CheckingRestoringSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+ .flatMap(new
CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+ .keyBy(0)
+ .flatMap(new
CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+ .keyBy(0)
+ .flatMap(new
CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+ .keyBy(0)
+ .transform(
+ "custom_operator",
+ new TypeHint<Tuple2<Long,
Long>>() {}.getTypeInfo(),
+ new
CheckingRestoringUdfOperator(new
CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator")
+ .keyBy(0)
+ .transform(
+ "timely_stateful_operator",
+ new TypeHint<Tuple2<Long,
Long>>() {}.getTypeInfo(),
+ new
CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator")
+ .addSink(new
AccumulatorCountingSink<Tuple2<Long, Long>>());
+
+ restoreAndExecute(
+ env,
+
getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint"),
+ new
Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
+ new
Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
+ new
Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
+ new
Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
+ new
Tuple2<>(CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
+ new
Tuple2<>(CheckingRestoringFlatMapWithKeyedStateInOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
+ new
Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
+ new
Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
+ new
Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
+ new
Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
NUM_SOURCE_ELEMENTS));
+ }
+
+ @Test
+ public void testSavepointRestoreFromFlink12FromRocksDB() throws
Exception {
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ // we only test memory state backend yet
+ env.setStateBackend(new RocksDBStateBackend(new
MemoryStateBackend()));
+ env.enableCheckpointing(500);
+ env.setParallelism(4);
+ env.setMaxParallelism(4);
+
+ env
+ .addSource(new
CheckingRestoringSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+ .flatMap(new
CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+ .keyBy(0)
+ .flatMap(new
CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+ .keyBy(0)
+ .flatMap(new
CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+ .keyBy(0)
+ .transform(
+ "custom_operator",
+ new TypeHint<Tuple2<Long,
Long>>() {}.getTypeInfo(),
+ new
CheckingRestoringUdfOperator(new
CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator")
+ .keyBy(0)
+ .transform(
+ "timely_stateful_operator",
+ new TypeHint<Tuple2<Long,
Long>>() {}.getTypeInfo(),
+ new
CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator")
+ .addSink(new
AccumulatorCountingSink<Tuple2<Long, Long>>());
+
+ restoreAndExecute(
+ env,
+
getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb"),
+ new
Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
+ new
Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
+ new
Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
+ new
Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
+ new
Tuple2<>(CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
+ new
Tuple2<>(CheckingRestoringFlatMapWithKeyedStateInOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
+ new
Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
+ new
Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
+ new
Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
+ new
Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
NUM_SOURCE_ELEMENTS));
+ }
+
+ private static class LegacyCheckpointedSource
+ implements SourceFunction<Tuple2<Long, Long>>,
Checkpointed<String> {
+
+ public static String CHECKPOINTED_STRING = "Here be dragons!";
+
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean isRunning = true;
+
+ private final int numElements;
+
+ public LegacyCheckpointedSource(int numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, Long>> ctx) throws
Exception {
+
+ ctx.emitWatermark(new Watermark(0));
+
+ synchronized (ctx.getCheckpointLock()) {
+ for (long i = 0; i < numElements; i++) {
+ ctx.collect(new Tuple2<>(i, i));
+ }
+ }
+
+ // don't emit a final watermark so that we don't
trigger the registered event-time
+ // timers
+ while (isRunning) {
+ Thread.sleep(20);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ @Override
+ public void restoreState(String state) throws Exception {
+ assertEquals(CHECKPOINTED_STRING, state);
+ }
+
+ @Override
+ public String snapshotState(long checkpointId, long
checkpointTimestamp) throws Exception {
+ return CHECKPOINTED_STRING;
+ }
+ }
+
+ private static class CheckingRestoringSource
+ extends RichSourceFunction<Tuple2<Long, Long>>
+ implements CheckpointedRestoring<String> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR
= CheckingRestoringSource.class + "_RESTORE_CHECK";
+
+ private volatile boolean isRunning = true;
+
+ private final int numElements;
+
+ private String restoredState;
+
+ public CheckingRestoringSource(int numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+
getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new
IntCounter());
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, Long>> ctx) throws
Exception {
+
assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState);
+
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
+
+ // immediately trigger any set timers
+ ctx.emitWatermark(new Watermark(1000));
+
+ synchronized (ctx.getCheckpointLock()) {
+ for (long i = 0; i < numElements; i++) {
+ ctx.collect(new Tuple2<>(i, i));
+ }
+ }
+
+ while (isRunning) {
+ Thread.sleep(20);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ @Override
+ public void restoreState(String state) throws Exception {
+ restoredState = state;
+ }
+ }
+
+ public static class LegacyCheckpointedFlatMap extends
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ implements Checkpointed<Tuple2<String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+ new Tuple2<>("hello", 42L);
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value,
Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+ }
+
+ @Override
+ public void restoreState(Tuple2<String, Long> state) throws
Exception {
+ }
+
+ @Override
+ public Tuple2<String, Long> snapshotState(long checkpointId,
long checkpointTimestamp) throws Exception {
+ return CHECKPOINTED_TUPLE;
+ }
+ }
+
+ public static class CheckingRestoringFlatMap extends
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR
= CheckingRestoringFlatMap.class + "_RESTORE_CHECK";
+
+ private transient Tuple2<String, Long> restoredState;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+
getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new
IntCounter());
+ }
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value,
Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+
assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
+
+ }
+
+ @Override
+ public void restoreState(Tuple2<String, Long> state) throws
Exception {
+ restoredState = state;
+ }
+ }
+
+ public static class LegacyCheckpointedFlatMapWithKeyedState
+ extends RichFlatMapFunction<Tuple2<Long, Long>,
Tuple2<Long, Long>>
+ implements Checkpointed<Tuple2<String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+ new Tuple2<>("hello", 42L);
+
+ private final ValueStateDescriptor<Long> stateDescriptor =
+ new ValueStateDescriptor<Long>("state-name",
LongSerializer.INSTANCE);
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value,
Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+
getRuntimeContext().getState(stateDescriptor).update(value.f1);
+
+ assertEquals(value.f1,
getRuntimeContext().getState(stateDescriptor).value());
--- End diff --
Isn't this always true? We just set the value after all.
> Add savepoint backwards compatibility tests from 1.2 to 1.3
> -----------------------------------------------------------
>
> Key: FLINK-5969
> URL: https://issues.apache.org/jira/browse/FLINK-5969
> Project: Flink
> Issue Type: Improvement
> Components: Tests
> Affects Versions: 1.3.0
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Priority: Blocker
> Fix For: 1.3.0
>
>
> We currently only have tests that test migration from 1.1 to 1.3, because we
> added these tests when releasing Flink 1.2.
> We have to copy/migrate those tests:
> - {{StatefulUDFSavepointMigrationITCase}}
> - {{*MigrationTest}}
> - {{AbstractKeyedCEPPatternOperator}}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)