This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2be15338c00e75361566fe3010091ea61f0af645
Author: Qingsheng Ren <[email protected]>
AuthorDate: Thu Apr 1 14:47:50 2021 +0800

    [FLINK-21817][connector/kafka] Remove mapping of reader id to split 
assignments from Kafka enumerator and its state
---
 .../flink/connector/kafka/source/KafkaSource.java  |   2 +-
 .../source/enumerator/KafkaSourceEnumState.java    |  13 ++-
 .../enumerator/KafkaSourceEnumStateSerializer.java |  67 ++++++++++++-
 .../source/enumerator/KafkaSourceEnumerator.java   |  24 ++---
 .../source/enumerator/KafkaEnumeratorTest.java     |  52 ++++++++--
 .../KafkaSourceEnumStateSerializerTest.java        | 108 +++++++++++++++++++++
 6 files changed, 232 insertions(+), 34 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index b7dd718..dd1c3bf 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -170,7 +170,7 @@ public class KafkaSource<OUT>
                 stoppingOffsetsInitializer,
                 props,
                 enumContext,
-                checkpoint.getCurrentAssignment());
+                checkpoint.assignedPartitions());
     }
 
     @Override
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java
index b3cf1b9..c93973f 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java
@@ -18,20 +18,19 @@
 
 package org.apache.flink.connector.kafka.source.enumerator;
 
-import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+import org.apache.kafka.common.TopicPartition;
 
-import java.util.Map;
 import java.util.Set;
 
 /** The state of Kafka source enumerator. */
 public class KafkaSourceEnumState {
-    private final Map<Integer, Set<KafkaPartitionSplit>> currentAssignment;
+    private final Set<TopicPartition> assignedPartitions;
 
-    KafkaSourceEnumState(Map<Integer, Set<KafkaPartitionSplit>> 
currentAssignment) {
-        this.currentAssignment = currentAssignment;
+    KafkaSourceEnumState(Set<TopicPartition> assignedPartitions) {
+        this.assignedPartitions = assignedPartitions;
     }
 
-    public Map<Integer, Set<KafkaPartitionSplit>> getCurrentAssignment() {
-        return currentAssignment;
+    public Set<TopicPartition> assignedPartitions() {
+        return assignedPartitions;
     }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java
index 2569eef..3b5d97e 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java
@@ -23,7 +23,14 @@ import 
org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import 
org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -35,7 +42,10 @@ import java.util.Set;
 public class KafkaSourceEnumStateSerializer
         implements SimpleVersionedSerializer<KafkaSourceEnumState> {
 
-    private static final int CURRENT_VERSION = 0;
+    private static final int VERSION_0 = 0;
+    private static final int VERSION_1 = 1;
+
+    private static final int CURRENT_VERSION = VERSION_1;
 
     @Override
     public int getVersion() {
@@ -44,22 +54,69 @@ public class KafkaSourceEnumStateSerializer
 
     @Override
     public byte[] serialize(KafkaSourceEnumState enumState) throws IOException 
{
-        return SerdeUtils.serializeSplitAssignments(
-                enumState.getCurrentAssignment(), new 
KafkaPartitionSplitSerializer());
+        return serializeTopicPartitions(enumState.assignedPartitions());
     }
 
     @Override
     public KafkaSourceEnumState deserialize(int version, byte[] serialized) 
throws IOException {
-        if (version == 0) {
+        if (version == CURRENT_VERSION) {
+            final Set<TopicPartition> assignedPartitions = 
deserializeTopicPartitions(serialized);
+            return new KafkaSourceEnumState(assignedPartitions);
+        }
+
+        // Backward compatibility
+        if (version == VERSION_0) {
             Map<Integer, Set<KafkaPartitionSplit>> currentPartitionAssignment =
                     SerdeUtils.deserializeSplitAssignments(
                             serialized, new KafkaPartitionSplitSerializer(), 
HashSet::new);
-            return new KafkaSourceEnumState(currentPartitionAssignment);
+            Set<TopicPartition> currentAssignedSplits = new HashSet<>();
+            currentPartitionAssignment.forEach(
+                    (reader, splits) ->
+                            splits.forEach(
+                                    split -> 
currentAssignedSplits.add(split.getTopicPartition())));
+            return new KafkaSourceEnumState(currentAssignedSplits);
         }
+
         throw new IOException(
                 String.format(
                         "The bytes are serialized with version %d, "
                                 + "while this deserializer only supports 
version up to %d",
                         version, CURRENT_VERSION));
     }
+
+    private static byte[] serializeTopicPartitions(Collection<TopicPartition> 
topicPartitions)
+            throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+
+            out.writeInt(topicPartitions.size());
+            for (TopicPartition tp : topicPartitions) {
+                out.writeUTF(tp.topic());
+                out.writeInt(tp.partition());
+            }
+            out.flush();
+
+            return baos.toByteArray();
+        }
+    }
+
+    private static Set<TopicPartition> deserializeTopicPartitions(byte[] 
serializedTopicPartitions)
+            throws IOException {
+        try (ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedTopicPartitions);
+                DataInputStream in = new DataInputStream(bais)) {
+
+            final int numPartitions = in.readInt();
+            Set<TopicPartition> topicPartitions = new HashSet<>(numPartitions);
+            for (int i = 0; i < numPartitions; i++) {
+                final String topic = in.readUTF();
+                final int partition = in.readInt();
+                topicPartitions.add(new TopicPartition(topic, partition));
+            }
+            if (in.available() > 0) {
+                throw new IOException("Unexpected trailing bytes in serialized 
topic partitions");
+            }
+
+            return topicPartitions;
+        }
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 1f3f3c6..529df52 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -71,13 +71,16 @@ public class KafkaSourceEnumerator
      * worker thread.
      */
     private final Set<TopicPartition> discoveredPartitions;
-    /** The current assignment by reader id. Only accessed by the coordinator 
thread. */
-    private final Map<Integer, Set<KafkaPartitionSplit>> 
readerIdToSplitAssignments;
+
+    /** Partitions that have been assigned to readers. */
+    private final Set<TopicPartition> assignedPartitions;
+
     /**
      * The discovered and initialized partition splits that are waiting for 
owner reader to be
      * ready.
      */
     private final Map<Integer, Set<KafkaPartitionSplit>> 
pendingPartitionSplitAssignment;
+
     /** The consumer group id used for this KafkaSource. */
     private final String consumerGroupId;
 
@@ -98,7 +101,7 @@ public class KafkaSourceEnumerator
                 stoppingOffsetInitializer,
                 properties,
                 context,
-                new HashMap<>());
+                Collections.emptySet());
     }
 
     public KafkaSourceEnumerator(
@@ -107,7 +110,7 @@ public class KafkaSourceEnumerator
             OffsetsInitializer stoppingOffsetInitializer,
             Properties properties,
             SplitEnumeratorContext<KafkaPartitionSplit> context,
-            Map<Integer, Set<KafkaPartitionSplit>> currentSplitsAssignments) {
+            Set<TopicPartition> assignedPartitions) {
         this.subscriber = subscriber;
         this.startingOffsetInitializer = startingOffsetInitializer;
         this.stoppingOffsetInitializer = stoppingOffsetInitializer;
@@ -115,10 +118,8 @@ public class KafkaSourceEnumerator
         this.context = context;
 
         this.discoveredPartitions = new HashSet<>();
-        this.readerIdToSplitAssignments = new 
HashMap<>(currentSplitsAssignments);
-        this.readerIdToSplitAssignments.forEach(
-                (reader, splits) ->
-                        splits.forEach(s -> 
discoveredPartitions.add(s.getTopicPartition())));
+        this.assignedPartitions = new HashSet<>(assignedPartitions);
+        discoveredPartitions.addAll(this.assignedPartitions);
         this.pendingPartitionSplitAssignment = new HashMap<>();
         this.partitionDiscoveryIntervalMs =
                 KafkaSourceOptions.getOption(
@@ -183,7 +184,7 @@ public class KafkaSourceEnumerator
 
     @Override
     public KafkaSourceEnumState snapshotState() throws Exception {
-        return new KafkaSourceEnumState(readerIdToSplitAssignments);
+        return new KafkaSourceEnumState(assignedPartitions);
     }
 
     @Override
@@ -279,9 +280,8 @@ public class KafkaSourceEnumerator
         incrementalAssignment.forEach(
                 (readerOwner, newPartitionSplits) -> {
                     // Update the split assignment.
-                    readerIdToSplitAssignments
-                            .computeIfAbsent(readerOwner, r -> new HashSet<>())
-                            .addAll(newPartitionSplits);
+                    newPartitionSplits.forEach(
+                            split -> 
assignedPartitions.add(split.getTopicPartition()));
                     // Clear the pending splits for the reader owner.
                     pendingPartitionSplitAssignment.remove(readerOwner);
                     // Sends NoMoreSplitsEvent to the readers if there is no 
more partition splits
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index c951f0b..679f732 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -255,7 +255,7 @@ public class KafkaEnumeratorTest {
     public void testWorkWithPreexistingAssignments() throws Throwable {
         final MockSplitEnumeratorContext<KafkaPartitionSplit> context1 =
                 new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
-        Map<Integer, Set<KafkaPartitionSplit>> preexistingAssignments;
+        Set<TopicPartition> preexistingAssignments;
         try (KafkaSourceEnumerator enumerator =
                 createEnumerator(context1, 
ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
             startEnumeratorAndRegisterReaders(context1, enumerator);
@@ -299,7 +299,7 @@ public class KafkaEnumeratorTest {
                         context,
                         ENABLE_PERIODIC_PARTITION_DISCOVERY,
                         PRE_EXISTING_TOPICS,
-                        Collections.emptyMap(),
+                        Collections.emptySet(),
                         properties)) {
             enumerator.start();
 
@@ -324,6 +324,30 @@ public class KafkaEnumeratorTest {
         }
     }
 
+    @Test
+    public void testSnapshotState() throws Throwable {
+        final MockSplitEnumeratorContext<KafkaPartitionSplit> context =
+                new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+
+        final KafkaSourceEnumerator enumerator = createEnumerator(context, 
false);
+        enumerator.start();
+
+        // No reader is registered, so the state should be empty
+        final KafkaSourceEnumState state1 = enumerator.snapshotState();
+        assertTrue(state1.assignedPartitions().isEmpty());
+
+        registerReader(context, enumerator, READER0);
+        registerReader(context, enumerator, READER1);
+        context.runNextOneTimeCallable();
+
+        // The state should contain splits assigned to READER0 and READER1
+        final KafkaSourceEnumState state2 = enumerator.snapshotState();
+        verifySplitAssignmentWithPartitions(
+                getExpectedAssignments(
+                        new HashSet<>(Arrays.asList(READER0, READER1)), 
PRE_EXISTING_TOPICS),
+                state2.assignedPartitions());
+    }
+
     // -------------- some common startup sequence ---------------
 
     private void startEnumeratorAndRegisterReaders(
@@ -370,7 +394,7 @@ public class KafkaEnumeratorTest {
                 enumContext,
                 enablePeriodicPartitionDiscovery,
                 topics,
-                Collections.emptyMap(),
+                Collections.emptySet(),
                 new Properties());
     }
 
@@ -382,7 +406,7 @@ public class KafkaEnumeratorTest {
             MockSplitEnumeratorContext<KafkaPartitionSplit> enumContext,
             boolean enablePeriodicPartitionDiscovery,
             Collection<String> topicsToSubscribe,
-            Map<Integer, Set<KafkaPartitionSplit>> currentAssignments,
+            Set<TopicPartition> assignedPartitions,
             Properties overrideProperties) {
         // Use a TopicPatternSubscriber so that no exception if a subscribed 
topic hasn't been
         // created yet.
@@ -408,7 +432,7 @@ public class KafkaEnumeratorTest {
                 stoppingOffsetsInitializer,
                 props,
                 enumContext,
-                currentAssignments);
+                assignedPartitions);
     }
 
     // ---------------------
@@ -475,11 +499,21 @@ public class KafkaEnumeratorTest {
         return expectedAssignments;
     }
 
-    private Map<Integer, Set<KafkaPartitionSplit>> asEnumState(
-            Map<Integer, List<KafkaPartitionSplit>> assignments) {
-        Map<Integer, Set<KafkaPartitionSplit>> enumState = new HashMap<>();
+    private void verifySplitAssignmentWithPartitions(
+            Map<Integer, Set<TopicPartition>> expectedAssignment,
+            Set<TopicPartition> actualTopicPartitions) {
+        final Set<TopicPartition> allTopicPartitionsFromAssignment = new 
HashSet<>();
+        expectedAssignment.forEach(
+                (reader, topicPartitions) ->
+                        
allTopicPartitionsFromAssignment.addAll(topicPartitions));
+        assertEquals(allTopicPartitionsFromAssignment, actualTopicPartitions);
+    }
+
+    private Set<TopicPartition> asEnumState(Map<Integer, 
List<KafkaPartitionSplit>> assignments) {
+        Set<TopicPartition> enumState = new HashSet<>();
         assignments.forEach(
-                (reader, assignment) -> enumState.put(reader, new 
HashSet<>(assignment)));
+                (reader, assignment) ->
+                        assignment.forEach(split -> 
enumState.add(split.getTopicPartition())));
         return enumState;
     }
 
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializerTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializerTest.java
new file mode 100644
index 0000000..d55f833
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.enumerator;
+
+import org.apache.flink.connector.base.source.utils.SerdeUtils;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+import 
org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link KafkaSourceEnumStateSerializer}. */
+public class KafkaSourceEnumStateSerializerTest {
+
+    private static final int NUM_READERS = 10;
+    private static final String TOPIC_PREFIX = "topic-";
+    private static final int NUM_PARTITIONS_PER_TOPIC = 10;
+    private static final long STARTING_OFFSET = 
KafkaPartitionSplit.EARLIEST_OFFSET;
+
+    @Test
+    public void testEnumStateSerde() throws IOException {
+        final KafkaSourceEnumState state = new 
KafkaSourceEnumState(constructTopicPartitions());
+        final KafkaSourceEnumStateSerializer serializer = new 
KafkaSourceEnumStateSerializer();
+
+        final byte[] bytes = serializer.serialize(state);
+
+        final KafkaSourceEnumState restoredState =
+                serializer.deserialize(serializer.getVersion(), bytes);
+
+        assertEquals(state.assignedPartitions(), 
restoredState.assignedPartitions());
+    }
+
+    @Test
+    public void testBackwardCompatibility() throws IOException {
+
+        final Set<TopicPartition> topicPartitions = constructTopicPartitions();
+        final Map<Integer, Set<KafkaPartitionSplit>> splitAssignments =
+                toSplitAssignments(topicPartitions);
+
+        // Create bytes in the way of KafkaEnumStateSerializer version 0 doing 
serialization
+        final byte[] bytes =
+                SerdeUtils.serializeSplitAssignments(
+                        splitAssignments, new KafkaPartitionSplitSerializer());
+
+        // Deserialize above bytes with KafkaEnumStateSerializer version 1 to 
check backward
+        // compatibility
+        final KafkaSourceEnumState kafkaSourceEnumState =
+                new KafkaSourceEnumStateSerializer().deserialize(0, bytes);
+
+        assertEquals(topicPartitions, 
kafkaSourceEnumState.assignedPartitions());
+    }
+
+    private Set<TopicPartition> constructTopicPartitions() {
+        // Create topic partitions for readers.
+        // Reader i will be assigned with NUM_PARTITIONS_PER_TOPIC splits, 
with topic name
+        // "topic-{i}" and
+        // NUM_PARTITIONS_PER_TOPIC partitions.
+        // Totally NUM_READERS * NUM_PARTITIONS_PER_TOPIC partitions will be 
created.
+        Set<TopicPartition> topicPartitions = new HashSet<>();
+        for (int readerId = 0; readerId < NUM_READERS; readerId++) {
+            for (int partition = 0; partition < NUM_PARTITIONS_PER_TOPIC; 
partition++) {
+                topicPartitions.add(new TopicPartition(TOPIC_PREFIX + 
readerId, partition));
+            }
+        }
+        return topicPartitions;
+    }
+
+    private Map<Integer, Set<KafkaPartitionSplit>> toSplitAssignments(
+            Collection<TopicPartition> topicPartitions) {
+        // Assign splits to readers according to topic name. For example, 
topic "topic-5" will be
+        // assigned to reader with ID=5
+        Map<Integer, Set<KafkaPartitionSplit>> splitAssignments = new 
HashMap<>();
+        topicPartitions.forEach(
+                (tp) ->
+                        splitAssignments
+                                .computeIfAbsent(
+                                        Integer.valueOf(
+                                                
tp.topic().substring(TOPIC_PREFIX.length())),
+                                        HashSet::new)
+                                .add(new KafkaPartitionSplit(tp, 
STARTING_OFFSET)));
+        return splitAssignments;
+    }
+}

Reply via email to