This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ff270e1d977cfcd6db1804ae16fc9002184fab3
Author: Aljoscha Krettek <aljos...@apache.org>
AuthorDate: Thu May 28 15:11:27 2020 +0200

    [FLINK-17376] Don't restore from Flink <= 1.2 state in Kafka connector
    
    This code was using the deprecated getSerializedListState(), which we
    want to remove.
---
 .../connectors/kafka/FlinkKafkaConsumerBase.java   |  40 ++++-----------------
 .../kafka/FlinkKafkaConsumerBaseMigrationTest.java |  38 --------------------
 ...er-migration-test-flink1.3-empty-state-snapshot | Bin 473 -> 0 bytes
 ...kafka-consumer-migration-test-flink1.3-snapshot | Bin 1255 -> 0 bytes
 4 files changed, 6 insertions(+), 72 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index e8c0ae4..0e8c261 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -36,7 +36,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -193,12 +192,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        /** Accessor for state in the operator state backend. */
        private transient ListState<Tuple2<KafkaTopicPartition, Long>> 
unionOffsetStates;
 
-       /**
-        * Flag indicating whether the consumer is restored from older state 
written with Flink 1.1 or 1.2.
-        * When the current run is restored from older state, partition 
discovery is disabled.
-        */
-       private boolean restoredFromOldState;
-
        /** Discovery loop, executed in a separate thread. */
        private transient volatile Thread discoveryLoopThread;
 
@@ -566,17 +559,11 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                        }
 
                        for (Map.Entry<KafkaTopicPartition, Long> 
restoredStateEntry : restoredState.entrySet()) {
-                               if (!restoredFromOldState) {
-                                       // seed the partition discoverer with 
the union state while filtering out
-                                       // restored partitions that should not 
be subscribed by this subtask
-                                       if (KafkaTopicPartitionAssigner.assign(
-                                               restoredStateEntry.getKey(), 
getRuntimeContext().getNumberOfParallelSubtasks())
-                                                       == 
getRuntimeContext().getIndexOfThisSubtask()){
-                                               
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), 
restoredStateEntry.getValue());
-                                       }
-                               } else {
-                                       // when restoring from older 1.1 / 1.2 
state, the restored state would not be the union state;
-                                       // in this case, just use the restored 
state as the subscribed partitions
+                               // seed the partition discoverer with the union 
state while filtering out
+                               // restored partitions that should not be 
subscribed by this subtask
+                               if (KafkaTopicPartitionAssigner.assign(
+                                       restoredStateEntry.getKey(), 
getRuntimeContext().getNumberOfParallelSubtasks())
+                                               == 
getRuntimeContext().getIndexOfThisSubtask()){
                                        
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), 
restoredStateEntry.getValue());
                                }
                        }
@@ -907,27 +894,12 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
 
                OperatorStateStore stateStore = context.getOperatorStateStore();
 
-               ListState<Tuple2<KafkaTopicPartition, Long>> 
oldRoundRobinListState =
-                       
stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
-
                this.unionOffsetStates = stateStore.getUnionListState(new 
ListStateDescriptor<>(OFFSETS_STATE_NAME,
                        
createStateSerializer(getRuntimeContext().getExecutionConfig())));
 
-               if (context.isRestored() && !restoredFromOldState) {
+               if (context.isRestored()) {
                        restoredState = new TreeMap<>(new 
KafkaTopicPartition.Comparator());
 
-                       // migrate from 1.2 state, if there is any
-                       for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : 
oldRoundRobinListState.get()) {
-                               restoredFromOldState = true;
-                               unionOffsetStates.add(kafkaOffset);
-                       }
-                       oldRoundRobinListState.clear();
-
-                       if (restoredFromOldState && discoveryIntervalMillis != 
PARTITION_DISCOVERY_DISABLED) {
-                               throw new IllegalArgumentException(
-                                       "Topic / partition discovery cannot be 
enabled if the job is restored from a savepoint from Flink 1.2.x.");
-                       }
-
                        // populate actual holder for restored state
                        for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : 
unionOffsetStates.get()) {
                                restoredState.put(kafkaOffset.f0, 
kafkaOffset.f1);
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index 2dfe1af..79fa356 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -38,7 +38,6 @@ import org.apache.flink.streaming.util.OperatorSnapshotUtil;
 import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.util.SerializedValue;
 
-import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -57,8 +56,6 @@ import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.doAnswer;
@@ -99,7 +96,6 @@ public class FlinkKafkaConsumerBaseMigrationTest {
        @Parameterized.Parameters(name = "Migration Savepoint: {0}")
        public static Collection<MigrationVersion> parameters () {
                return Arrays.asList(
-                       MigrationVersion.v1_3,
                        MigrationVersion.v1_4,
                        MigrationVersion.v1_5,
                        MigrationVersion.v1_6,
@@ -325,40 +321,6 @@ public class FlinkKafkaConsumerBaseMigrationTest {
                consumerOperator.cancel();
        }
 
-       /**
-        * Test restoring from savepoints before version Flink 1.3 should fail 
if discovery is enabled.
-        */
-       @Test
-       public void 
testRestoreFailsWithNonEmptyPreFlink13StatesIfDiscoveryEnabled() throws 
Exception {
-               assumeTrue(testMigrateVersion == MigrationVersion.v1_3);
-
-               final List<KafkaTopicPartition> partitions = new 
ArrayList<>(PARTITION_STATE.keySet());
-
-               final DummyFlinkKafkaConsumer<String> consumerFunction =
-                       new DummyFlinkKafkaConsumer<>(TOPICS, partitions, 
1000L); // discovery enabled
-
-               StreamSource<String, DummyFlinkKafkaConsumer<String>> 
consumerOperator =
-                       new StreamSource<>(consumerFunction);
-
-               final AbstractStreamOperatorTestHarness<String> testHarness =
-                       new 
AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
-
-               
testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-               testHarness.setup();
-
-               // restore state from binary snapshot file; should fail since 
discovery is enabled
-               try {
-                       testHarness.initializeState(
-                               OperatorSnapshotUtil.getResourceFilename(
-                                       "kafka-consumer-migration-test-flink" + 
testMigrateVersion + "-snapshot"));
-
-                       fail("Restore from savepoints from version before Flink 
1.3.x should have failed if discovery is enabled.");
-               } catch (Exception e) {
-                       Assert.assertTrue(e instanceof 
IllegalArgumentException);
-               }
-       }
-
        // 
------------------------------------------------------------------------
 
        private static class DummyFlinkKafkaConsumer<T> extends 
FlinkKafkaConsumerBase<T> {
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-empty-state-snapshot
 
b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-empty-state-snapshot
deleted file mode 100644
index 1a5aad1..0000000
Binary files 
a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-empty-state-snapshot
 and /dev/null differ
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-snapshot
 
b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-snapshot
deleted file mode 100644
index dc820ef..0000000
Binary files 
a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-snapshot
 and /dev/null differ

Reply via email to