This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4ff270e1d977cfcd6db1804ae16fc9002184fab3 Author: Aljoscha Krettek <aljos...@apache.org> AuthorDate: Thu May 28 15:11:27 2020 +0200 [FLINK-17376] Don't restore from Flink <= 1.2 state in Kafka connector This code was using the deprecated getSerializedListState(), which we want to remove. --- .../connectors/kafka/FlinkKafkaConsumerBase.java | 40 ++++----------------- .../kafka/FlinkKafkaConsumerBaseMigrationTest.java | 38 -------------------- ...er-migration-test-flink1.3-empty-state-snapshot | Bin 473 -> 0 bytes ...kafka-consumer-migration-test-flink1.3-snapshot | Bin 1255 -> 0 bytes 4 files changed, 6 insertions(+), 72 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index e8c0ae4..0e8c261 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -36,7 +36,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -193,12 +192,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti /** Accessor for state in the operator state backend. */ private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates; - /** - * Flag indicating whether the consumer is restored from older state written with Flink 1.1 or 1.2. - * When the current run is restored from older state, partition discovery is disabled. - */ - private boolean restoredFromOldState; - /** Discovery loop, executed in a separate thread. */ private transient volatile Thread discoveryLoopThread; @@ -566,17 +559,11 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti } for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) { - if (!restoredFromOldState) { - // seed the partition discoverer with the union state while filtering out - // restored partitions that should not be subscribed by this subtask - if (KafkaTopicPartitionAssigner.assign( - restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks()) - == getRuntimeContext().getIndexOfThisSubtask()){ - subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue()); - } - } else { - // when restoring from older 1.1 / 1.2 state, the restored state would not be the union state; - // in this case, just use the restored state as the subscribed partitions + // seed the partition discoverer with the union state while filtering out + // restored partitions that should not be subscribed by this subtask + if (KafkaTopicPartitionAssigner.assign( + restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks()) + == getRuntimeContext().getIndexOfThisSubtask()){ subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue()); } } @@ -907,27 +894,12 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti OperatorStateStore stateStore = context.getOperatorStateStore(); - ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState = - stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); - this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME, createStateSerializer(getRuntimeContext().getExecutionConfig()))); - if (context.isRestored() && !restoredFromOldState) { + if (context.isRestored()) { restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator()); - // migrate from 1.2 state, if there is any - for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) { - restoredFromOldState = true; - unionOffsetStates.add(kafkaOffset); - } - oldRoundRobinListState.clear(); - - if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) { - throw new IllegalArgumentException( - "Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x."); - } - // populate actual holder for restored state for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) { restoredState.put(kafkaOffset.f0, kafkaOffset.f1); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java index 2dfe1af..79fa356 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java @@ -38,7 +38,6 @@ import org.apache.flink.streaming.util.OperatorSnapshotUtil; import org.apache.flink.testutils.migration.MigrationVersion; import org.apache.flink.util.SerializedValue; -import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; @@ -57,8 +56,6 @@ import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.junit.Assume.assumeTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.powermock.api.mockito.PowerMockito.doAnswer; @@ -99,7 +96,6 @@ public class FlinkKafkaConsumerBaseMigrationTest { @Parameterized.Parameters(name = "Migration Savepoint: {0}") public static Collection<MigrationVersion> parameters () { return Arrays.asList( - MigrationVersion.v1_3, MigrationVersion.v1_4, MigrationVersion.v1_5, MigrationVersion.v1_6, @@ -325,40 +321,6 @@ public class FlinkKafkaConsumerBaseMigrationTest { consumerOperator.cancel(); } - /** - * Test restoring from savepoints before version Flink 1.3 should fail if discovery is enabled. - */ - @Test - public void testRestoreFailsWithNonEmptyPreFlink13StatesIfDiscoveryEnabled() throws Exception { - assumeTrue(testMigrateVersion == MigrationVersion.v1_3); - - final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet()); - - final DummyFlinkKafkaConsumer<String> consumerFunction = - new DummyFlinkKafkaConsumer<>(TOPICS, partitions, 1000L); // discovery enabled - - StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator = - new StreamSource<>(consumerFunction); - - final AbstractStreamOperatorTestHarness<String> testHarness = - new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); - - testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); - - testHarness.setup(); - - // restore state from binary snapshot file; should fail since discovery is enabled - try { - testHarness.initializeState( - OperatorSnapshotUtil.getResourceFilename( - "kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot")); - - fail("Restore from savepoints from version before Flink 1.3.x should have failed if discovery is enabled."); - } catch (Exception e) { - Assert.assertTrue(e instanceof IllegalArgumentException); - } - } - // ------------------------------------------------------------------------ private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> { diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-empty-state-snapshot b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-empty-state-snapshot deleted file mode 100644 index 1a5aad1..0000000 Binary files a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-empty-state-snapshot and /dev/null differ diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-snapshot b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-snapshot deleted file mode 100644 index dc820ef..0000000 Binary files a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-snapshot and /dev/null differ