This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c6f14ca5b10d30232966bce2f52e9f9128346473 Author: dengziming <[email protected]> AuthorDate: Sat Dec 18 10:21:29 2021 +0800 [FLINK-25368][connectors/kafka] Substitute KafkaConsumer with AdminClient in OffsetsInitializer --- .../source/enumerator/KafkaSourceEnumerator.java | 117 +++++++++++++++------ .../enumerator/initializer/OffsetsInitializer.java | 12 +-- .../source/enumerator/KafkaEnumeratorTest.java | 8 -- .../initializer/OffsetsInitializerTest.java | 4 +- 4 files changed, 89 insertions(+), 52 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java index 8b89b7e..27dbd22 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java @@ -31,12 +31,13 @@ import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; import org.apache.flink.util.FlinkRuntimeException; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +55,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; +import java.util.stream.Collectors; /** The enumerator class for Kafka source. */ @Internal @@ -81,7 +83,6 @@ public class KafkaSourceEnumerator private final String consumerGroupId; // Lazily instantiated or mutable fields. - private KafkaConsumer<byte[], byte[]> consumer; private AdminClient adminClient; // This flag will be marked as true if periodically partition discovery is disabled AND the @@ -147,7 +148,6 @@ public class KafkaSourceEnumerator */ @Override public void start() { - consumer = getKafkaConsumer(); adminClient = getKafkaAdminClient(); if (partitionDiscoveryIntervalMs > 0) { LOG.info( @@ -200,9 +200,6 @@ public class KafkaSourceEnumerator @Override public void close() { - if (consumer != null) { - consumer.close(); - } if (adminClient != null) { adminClient.close(); } @@ -402,25 +399,6 @@ public class KafkaSourceEnumerator return new PartitionChange(fetchedPartitions, removedPartitions); } - private KafkaConsumer<byte[], byte[]> getKafkaConsumer() { - Properties consumerProps = new Properties(); - deepCopyProperties(properties, consumerProps); - // set client id prefix - String clientIdPrefix = - consumerProps.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key()); - consumerProps.setProperty( - ConsumerConfig.CLIENT_ID_CONFIG, clientIdPrefix + "-enumerator-consumer"); - consumerProps.setProperty( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - ByteArrayDeserializer.class.getName()); - consumerProps.setProperty( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - ByteArrayDeserializer.class.getName()); - // Disable auto topic creation. - consumerProps.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); - return new KafkaConsumer<>(consumerProps); - } - private AdminClient getKafkaAdminClient() { Properties adminClientProps = new Properties(); deepCopyProperties(properties, adminClientProps); @@ -434,7 +412,7 @@ public class KafkaSourceEnumerator private OffsetsInitializer.PartitionOffsetsRetriever getOffsetsRetriever() { String groupId = properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG); - return new PartitionOffsetsRetrieverImpl(consumer, adminClient, groupId); + return new PartitionOffsetsRetrieverImpl(adminClient, groupId); } /** @@ -514,13 +492,10 @@ public class KafkaSourceEnumerator @VisibleForTesting public static class PartitionOffsetsRetrieverImpl implements OffsetsInitializer.PartitionOffsetsRetriever, AutoCloseable { - private final KafkaConsumer<?, ?> consumer; private final AdminClient adminClient; private final String groupId; - public PartitionOffsetsRetrieverImpl( - KafkaConsumer<?, ?> consumer, AdminClient adminClient, String groupId) { - this.consumer = consumer; + public PartitionOffsetsRetrieverImpl(AdminClient adminClient, String groupId) { this.adminClient = adminClient; this.groupId = groupId; } @@ -547,6 +522,7 @@ public class KafkaSourceEnumerator }) .get(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new FlinkRuntimeException( "Interrupted while listing offsets for consumer group " + groupId, e); } catch (ExecutionException e) { @@ -558,25 +534,96 @@ public class KafkaSourceEnumerator } } + /** + * List offsets for the specified partitions and OffsetSpec. This operation enables to find + * the beginning offset, end offset as well as the offset matching a timestamp in + * partitions. + * + * @see KafkaAdminClient#listOffsets(Map) + * @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up. + * @return The list offsets result. + */ + private Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> listOffsets( + Map<TopicPartition, OffsetSpec> topicPartitionOffsets) { + try { + return adminClient + .listOffsets(topicPartitionOffsets) + .all() + .thenApply( + result -> { + Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> + offsets = new HashMap<>(); + result.forEach( + (tp, listOffsetsResultInfo) -> { + if (listOffsetsResultInfo != null) { + offsets.put(tp, listOffsetsResultInfo); + } + }); + return offsets; + }) + .get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new FlinkRuntimeException( + "Interrupted while listing offsets for topic partitions: " + + topicPartitionOffsets, + e); + } catch (ExecutionException e) { + throw new FlinkRuntimeException( + "Failed to list offsets for topic partitions: " + + topicPartitionOffsets + + " due to", + e); + } + } + + private Map<TopicPartition, Long> listOffsets( + Collection<TopicPartition> partitions, OffsetSpec offsetSpec) { + return listOffsets( + partitions.stream() + .collect( + Collectors.toMap( + partition -> partition, __ -> offsetSpec))) + .entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, entry -> entry.getValue().offset())); + } + @Override public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) { - return consumer.endOffsets(partitions); + return listOffsets(partitions, OffsetSpec.latest()); } @Override public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) { - return consumer.beginningOffsets(partitions); + return listOffsets(partitions, OffsetSpec.earliest()); } @Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes( Map<TopicPartition, Long> timestampsToSearch) { - return consumer.offsetsForTimes(timestampsToSearch); + return listOffsets( + timestampsToSearch.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + OffsetSpec.forTimestamp( + entry.getValue())))) + .entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + new OffsetAndTimestamp( + entry.getValue().offset(), + entry.getValue().timestamp(), + entry.getValue().leaderEpoch()))); } @Override public void close() throws Exception { - consumer.close(Duration.ZERO); adminClient.close(Duration.ZERO); } } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java index 9774cb8..6de272d 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java @@ -24,7 +24,6 @@ import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; @@ -34,7 +33,8 @@ import java.util.Collection; import java.util.Map; /** - * A interface for users to specify the starting / stopping offset of a {@link KafkaPartitionSplit}. + * An interface for users to specify the starting / stopping offset of a {@link + * KafkaPartitionSplit}. * * @see ReaderHandledOffsetsInitializer * @see SpecifiedOffsetsInitializer @@ -85,13 +85,13 @@ public interface OffsetsInitializer extends Serializable { */ Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions); - /** @see KafkaConsumer#endOffsets(Collection) */ + /** List end offsets for the specified partitions. */ Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions); - /** @see KafkaConsumer#beginningOffsets(Collection) */ + /** List beginning offsets for the specified partitions. */ Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions); - /** @see KafkaConsumer#offsetsForTimes(Map) */ + /** List offsets matching a timestamp for the specified partitions. */ Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes( Map<TopicPartition, Long> timestampsToSearch); } @@ -130,7 +130,7 @@ public interface OffsetsInitializer extends Serializable { * @param timestamp the timestamp to start the consumption. * @return an {@link OffsetsInitializer} which initializes the offsets based on the given * timestamp. - * @see KafkaConsumer#offsetsForTimes(Map) + * @see KafkaAdminClient#listOffsets(Map) */ static OffsetsInitializer timestamp(long timestamp) { return new TimestampOffsetsInitializer(timestamp); diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java index 9216172..2a3200b 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java @@ -32,7 +32,6 @@ import org.apache.flink.mock.Whitebox; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.AfterClass; @@ -313,15 +312,8 @@ public class KafkaEnumeratorTest { defaultTimeoutMs, Whitebox.getInternalState(adminClient, "defaultApiTimeoutMs")); - KafkaConsumer<?, ?> consumer = - (KafkaConsumer<?, ?>) Whitebox.getInternalState(enumerator, "consumer"); - assertNotNull(consumer); - clientId = (String) Whitebox.getInternalState(consumer, "clientId"); assertNotNull(clientId); assertTrue(clientId.startsWith(clientIdPrefix)); - assertEquals( - (long) defaultTimeoutMs, - Whitebox.getInternalState(consumer, "requestTimeoutMs")); } } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java index 6c696f0..4b74bbe 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java @@ -51,9 +51,7 @@ public class OffsetsInitializerTest { KafkaSourceTestEnv.setupTopic(TOPIC2, false, false, KafkaSourceTestEnv::getRecordsForTopic); retriever = new KafkaSourceEnumerator.PartitionOffsetsRetrieverImpl( - KafkaSourceTestEnv.getConsumer(), - KafkaSourceTestEnv.getAdminClient(), - KafkaSourceTestEnv.GROUP_ID); + KafkaSourceTestEnv.getAdminClient(), KafkaSourceTestEnv.GROUP_ID); } @AfterClass
