This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2be15338c00e75361566fe3010091ea61f0af645 Author: Qingsheng Ren <[email protected]> AuthorDate: Thu Apr 1 14:47:50 2021 +0800 [FLINK-21817][connector/kafka] Remove mapping of reader id to split assignments from Kafka enumerator and its state --- .../flink/connector/kafka/source/KafkaSource.java | 2 +- .../source/enumerator/KafkaSourceEnumState.java | 13 ++- .../enumerator/KafkaSourceEnumStateSerializer.java | 67 ++++++++++++- .../source/enumerator/KafkaSourceEnumerator.java | 24 ++--- .../source/enumerator/KafkaEnumeratorTest.java | 52 ++++++++-- .../KafkaSourceEnumStateSerializerTest.java | 108 +++++++++++++++++++++ 6 files changed, 232 insertions(+), 34 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java index b7dd718..dd1c3bf 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java @@ -170,7 +170,7 @@ public class KafkaSource<OUT> stoppingOffsetsInitializer, props, enumContext, - checkpoint.getCurrentAssignment()); + checkpoint.assignedPartitions()); } @Override diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java index b3cf1b9..c93973f 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java @@ -18,20 +18,19 @@ package org.apache.flink.connector.kafka.source.enumerator; -import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; +import org.apache.kafka.common.TopicPartition; -import java.util.Map; import java.util.Set; /** The state of Kafka source enumerator. */ public class KafkaSourceEnumState { - private final Map<Integer, Set<KafkaPartitionSplit>> currentAssignment; + private final Set<TopicPartition> assignedPartitions; - KafkaSourceEnumState(Map<Integer, Set<KafkaPartitionSplit>> currentAssignment) { - this.currentAssignment = currentAssignment; + KafkaSourceEnumState(Set<TopicPartition> assignedPartitions) { + this.assignedPartitions = assignedPartitions; } - public Map<Integer, Set<KafkaPartitionSplit>> getCurrentAssignment() { - return currentAssignment; + public Set<TopicPartition> assignedPartitions() { + return assignedPartitions; } } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java index 2569eef..3b5d97e 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java @@ -23,7 +23,14 @@ import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.kafka.common.TopicPartition; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; +import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -35,7 +42,10 @@ import java.util.Set; public class KafkaSourceEnumStateSerializer implements SimpleVersionedSerializer<KafkaSourceEnumState> { - private static final int CURRENT_VERSION = 0; + private static final int VERSION_0 = 0; + private static final int VERSION_1 = 1; + + private static final int CURRENT_VERSION = VERSION_1; @Override public int getVersion() { @@ -44,22 +54,69 @@ public class KafkaSourceEnumStateSerializer @Override public byte[] serialize(KafkaSourceEnumState enumState) throws IOException { - return SerdeUtils.serializeSplitAssignments( - enumState.getCurrentAssignment(), new KafkaPartitionSplitSerializer()); + return serializeTopicPartitions(enumState.assignedPartitions()); } @Override public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws IOException { - if (version == 0) { + if (version == CURRENT_VERSION) { + final Set<TopicPartition> assignedPartitions = deserializeTopicPartitions(serialized); + return new KafkaSourceEnumState(assignedPartitions); + } + + // Backward compatibility + if (version == VERSION_0) { Map<Integer, Set<KafkaPartitionSplit>> currentPartitionAssignment = SerdeUtils.deserializeSplitAssignments( serialized, new KafkaPartitionSplitSerializer(), HashSet::new); - return new KafkaSourceEnumState(currentPartitionAssignment); + Set<TopicPartition> currentAssignedSplits = new HashSet<>(); + currentPartitionAssignment.forEach( + (reader, splits) -> + splits.forEach( + split -> currentAssignedSplits.add(split.getTopicPartition()))); + return new KafkaSourceEnumState(currentAssignedSplits); } + throw new IOException( String.format( "The bytes are serialized with version %d, " + "while this deserializer only supports version up to %d", version, CURRENT_VERSION)); } + + private static byte[] serializeTopicPartitions(Collection<TopicPartition> topicPartitions) + throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos)) { + + out.writeInt(topicPartitions.size()); + for (TopicPartition tp : topicPartitions) { + out.writeUTF(tp.topic()); + out.writeInt(tp.partition()); + } + out.flush(); + + return baos.toByteArray(); + } + } + + private static Set<TopicPartition> deserializeTopicPartitions(byte[] serializedTopicPartitions) + throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedTopicPartitions); + DataInputStream in = new DataInputStream(bais)) { + + final int numPartitions = in.readInt(); + Set<TopicPartition> topicPartitions = new HashSet<>(numPartitions); + for (int i = 0; i < numPartitions; i++) { + final String topic = in.readUTF(); + final int partition = in.readInt(); + topicPartitions.add(new TopicPartition(topic, partition)); + } + if (in.available() > 0) { + throw new IOException("Unexpected trailing bytes in serialized topic partitions"); + } + + return topicPartitions; + } + } } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java index 1f3f3c6..529df52 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java @@ -71,13 +71,16 @@ public class KafkaSourceEnumerator * worker thread. */ private final Set<TopicPartition> discoveredPartitions; - /** The current assignment by reader id. Only accessed by the coordinator thread. */ - private final Map<Integer, Set<KafkaPartitionSplit>> readerIdToSplitAssignments; + + /** Partitions that have been assigned to readers. */ + private final Set<TopicPartition> assignedPartitions; + /** * The discovered and initialized partition splits that are waiting for owner reader to be * ready. */ private final Map<Integer, Set<KafkaPartitionSplit>> pendingPartitionSplitAssignment; + /** The consumer group id used for this KafkaSource. */ private final String consumerGroupId; @@ -98,7 +101,7 @@ public class KafkaSourceEnumerator stoppingOffsetInitializer, properties, context, - new HashMap<>()); + Collections.emptySet()); } public KafkaSourceEnumerator( @@ -107,7 +110,7 @@ public class KafkaSourceEnumerator OffsetsInitializer stoppingOffsetInitializer, Properties properties, SplitEnumeratorContext<KafkaPartitionSplit> context, - Map<Integer, Set<KafkaPartitionSplit>> currentSplitsAssignments) { + Set<TopicPartition> assignedPartitions) { this.subscriber = subscriber; this.startingOffsetInitializer = startingOffsetInitializer; this.stoppingOffsetInitializer = stoppingOffsetInitializer; @@ -115,10 +118,8 @@ public class KafkaSourceEnumerator this.context = context; this.discoveredPartitions = new HashSet<>(); - this.readerIdToSplitAssignments = new HashMap<>(currentSplitsAssignments); - this.readerIdToSplitAssignments.forEach( - (reader, splits) -> - splits.forEach(s -> discoveredPartitions.add(s.getTopicPartition()))); + this.assignedPartitions = new HashSet<>(assignedPartitions); + discoveredPartitions.addAll(this.assignedPartitions); this.pendingPartitionSplitAssignment = new HashMap<>(); this.partitionDiscoveryIntervalMs = KafkaSourceOptions.getOption( @@ -183,7 +184,7 @@ public class KafkaSourceEnumerator @Override public KafkaSourceEnumState snapshotState() throws Exception { - return new KafkaSourceEnumState(readerIdToSplitAssignments); + return new KafkaSourceEnumState(assignedPartitions); } @Override @@ -279,9 +280,8 @@ public class KafkaSourceEnumerator incrementalAssignment.forEach( (readerOwner, newPartitionSplits) -> { // Update the split assignment. - readerIdToSplitAssignments - .computeIfAbsent(readerOwner, r -> new HashSet<>()) - .addAll(newPartitionSplits); + newPartitionSplits.forEach( + split -> assignedPartitions.add(split.getTopicPartition())); // Clear the pending splits for the reader owner. pendingPartitionSplitAssignment.remove(readerOwner); // Sends NoMoreSplitsEvent to the readers if there is no more partition splits diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java index c951f0b..679f732 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java @@ -255,7 +255,7 @@ public class KafkaEnumeratorTest { public void testWorkWithPreexistingAssignments() throws Throwable { final MockSplitEnumeratorContext<KafkaPartitionSplit> context1 = new MockSplitEnumeratorContext<>(NUM_SUBTASKS); - Map<Integer, Set<KafkaPartitionSplit>> preexistingAssignments; + Set<TopicPartition> preexistingAssignments; try (KafkaSourceEnumerator enumerator = createEnumerator(context1, ENABLE_PERIODIC_PARTITION_DISCOVERY)) { startEnumeratorAndRegisterReaders(context1, enumerator); @@ -299,7 +299,7 @@ public class KafkaEnumeratorTest { context, ENABLE_PERIODIC_PARTITION_DISCOVERY, PRE_EXISTING_TOPICS, - Collections.emptyMap(), + Collections.emptySet(), properties)) { enumerator.start(); @@ -324,6 +324,30 @@ public class KafkaEnumeratorTest { } } + @Test + public void testSnapshotState() throws Throwable { + final MockSplitEnumeratorContext<KafkaPartitionSplit> context = + new MockSplitEnumeratorContext<>(NUM_SUBTASKS); + + final KafkaSourceEnumerator enumerator = createEnumerator(context, false); + enumerator.start(); + + // No reader is registered, so the state should be empty + final KafkaSourceEnumState state1 = enumerator.snapshotState(); + assertTrue(state1.assignedPartitions().isEmpty()); + + registerReader(context, enumerator, READER0); + registerReader(context, enumerator, READER1); + context.runNextOneTimeCallable(); + + // The state should contain splits assigned to READER0 and READER1 + final KafkaSourceEnumState state2 = enumerator.snapshotState(); + verifySplitAssignmentWithPartitions( + getExpectedAssignments( + new HashSet<>(Arrays.asList(READER0, READER1)), PRE_EXISTING_TOPICS), + state2.assignedPartitions()); + } + // -------------- some common startup sequence --------------- private void startEnumeratorAndRegisterReaders( @@ -370,7 +394,7 @@ public class KafkaEnumeratorTest { enumContext, enablePeriodicPartitionDiscovery, topics, - Collections.emptyMap(), + Collections.emptySet(), new Properties()); } @@ -382,7 +406,7 @@ public class KafkaEnumeratorTest { MockSplitEnumeratorContext<KafkaPartitionSplit> enumContext, boolean enablePeriodicPartitionDiscovery, Collection<String> topicsToSubscribe, - Map<Integer, Set<KafkaPartitionSplit>> currentAssignments, + Set<TopicPartition> assignedPartitions, Properties overrideProperties) { // Use a TopicPatternSubscriber so that no exception if a subscribed topic hasn't been // created yet. @@ -408,7 +432,7 @@ public class KafkaEnumeratorTest { stoppingOffsetsInitializer, props, enumContext, - currentAssignments); + assignedPartitions); } // --------------------- @@ -475,11 +499,21 @@ public class KafkaEnumeratorTest { return expectedAssignments; } - private Map<Integer, Set<KafkaPartitionSplit>> asEnumState( - Map<Integer, List<KafkaPartitionSplit>> assignments) { - Map<Integer, Set<KafkaPartitionSplit>> enumState = new HashMap<>(); + private void verifySplitAssignmentWithPartitions( + Map<Integer, Set<TopicPartition>> expectedAssignment, + Set<TopicPartition> actualTopicPartitions) { + final Set<TopicPartition> allTopicPartitionsFromAssignment = new HashSet<>(); + expectedAssignment.forEach( + (reader, topicPartitions) -> + allTopicPartitionsFromAssignment.addAll(topicPartitions)); + assertEquals(allTopicPartitionsFromAssignment, actualTopicPartitions); + } + + private Set<TopicPartition> asEnumState(Map<Integer, List<KafkaPartitionSplit>> assignments) { + Set<TopicPartition> enumState = new HashSet<>(); assignments.forEach( - (reader, assignment) -> enumState.put(reader, new HashSet<>(assignment))); + (reader, assignment) -> + assignment.forEach(split -> enumState.add(split.getTopicPartition()))); return enumState; } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializerTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializerTest.java new file mode 100644 index 0000000..d55f833 --- /dev/null +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializerTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source.enumerator; + +import org.apache.flink.connector.base.source.utils.SerdeUtils; +import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; +import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer; + +import org.apache.kafka.common.TopicPartition; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** Test for {@link KafkaSourceEnumStateSerializer}. */ +public class KafkaSourceEnumStateSerializerTest { + + private static final int NUM_READERS = 10; + private static final String TOPIC_PREFIX = "topic-"; + private static final int NUM_PARTITIONS_PER_TOPIC = 10; + private static final long STARTING_OFFSET = KafkaPartitionSplit.EARLIEST_OFFSET; + + @Test + public void testEnumStateSerde() throws IOException { + final KafkaSourceEnumState state = new KafkaSourceEnumState(constructTopicPartitions()); + final KafkaSourceEnumStateSerializer serializer = new KafkaSourceEnumStateSerializer(); + + final byte[] bytes = serializer.serialize(state); + + final KafkaSourceEnumState restoredState = + serializer.deserialize(serializer.getVersion(), bytes); + + assertEquals(state.assignedPartitions(), restoredState.assignedPartitions()); + } + + @Test + public void testBackwardCompatibility() throws IOException { + + final Set<TopicPartition> topicPartitions = constructTopicPartitions(); + final Map<Integer, Set<KafkaPartitionSplit>> splitAssignments = + toSplitAssignments(topicPartitions); + + // Create bytes in the way of KafkaEnumStateSerializer version 0 doing serialization + final byte[] bytes = + SerdeUtils.serializeSplitAssignments( + splitAssignments, new KafkaPartitionSplitSerializer()); + + // Deserialize above bytes with KafkaEnumStateSerializer version 1 to check backward + // compatibility + final KafkaSourceEnumState kafkaSourceEnumState = + new KafkaSourceEnumStateSerializer().deserialize(0, bytes); + + assertEquals(topicPartitions, kafkaSourceEnumState.assignedPartitions()); + } + + private Set<TopicPartition> constructTopicPartitions() { + // Create topic partitions for readers. + // Reader i will be assigned with NUM_PARTITIONS_PER_TOPIC splits, with topic name + // "topic-{i}" and + // NUM_PARTITIONS_PER_TOPIC partitions. + // Totally NUM_READERS * NUM_PARTITIONS_PER_TOPIC partitions will be created. + Set<TopicPartition> topicPartitions = new HashSet<>(); + for (int readerId = 0; readerId < NUM_READERS; readerId++) { + for (int partition = 0; partition < NUM_PARTITIONS_PER_TOPIC; partition++) { + topicPartitions.add(new TopicPartition(TOPIC_PREFIX + readerId, partition)); + } + } + return topicPartitions; + } + + private Map<Integer, Set<KafkaPartitionSplit>> toSplitAssignments( + Collection<TopicPartition> topicPartitions) { + // Assign splits to readers according to topic name. For example, topic "topic-5" will be + // assigned to reader with ID=5 + Map<Integer, Set<KafkaPartitionSplit>> splitAssignments = new HashMap<>(); + topicPartitions.forEach( + (tp) -> + splitAssignments + .computeIfAbsent( + Integer.valueOf( + tp.topic().substring(TOPIC_PREFIX.length())), + HashSet::new) + .add(new KafkaPartitionSplit(tp, STARTING_OFFSET))); + return splitAssignments; + } +}
