This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6f14ca5b10d30232966bce2f52e9f9128346473
Author: dengziming <[email protected]>
AuthorDate: Sat Dec 18 10:21:29 2021 +0800

    [FLINK-25368][connectors/kafka] Substitute KafkaConsumer with AdminClient 
in OffsetsInitializer
---
 .../source/enumerator/KafkaSourceEnumerator.java   | 117 +++++++++++++++------
 .../enumerator/initializer/OffsetsInitializer.java |  12 +--
 .../source/enumerator/KafkaEnumeratorTest.java     |   8 --
 .../initializer/OffsetsInitializerTest.java        |   4 +-
 4 files changed, 89 insertions(+), 52 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 8b89b7e..27dbd22 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -31,12 +31,13 @@ import 
org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +55,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 /** The enumerator class for Kafka source. */
 @Internal
@@ -81,7 +83,6 @@ public class KafkaSourceEnumerator
     private final String consumerGroupId;
 
     // Lazily instantiated or mutable fields.
-    private KafkaConsumer<byte[], byte[]> consumer;
     private AdminClient adminClient;
 
     // This flag will be marked as true if periodically partition discovery is 
disabled AND the
@@ -147,7 +148,6 @@ public class KafkaSourceEnumerator
      */
     @Override
     public void start() {
-        consumer = getKafkaConsumer();
         adminClient = getKafkaAdminClient();
         if (partitionDiscoveryIntervalMs > 0) {
             LOG.info(
@@ -200,9 +200,6 @@ public class KafkaSourceEnumerator
 
     @Override
     public void close() {
-        if (consumer != null) {
-            consumer.close();
-        }
         if (adminClient != null) {
             adminClient.close();
         }
@@ -402,25 +399,6 @@ public class KafkaSourceEnumerator
         return new PartitionChange(fetchedPartitions, removedPartitions);
     }
 
-    private KafkaConsumer<byte[], byte[]> getKafkaConsumer() {
-        Properties consumerProps = new Properties();
-        deepCopyProperties(properties, consumerProps);
-        // set client id prefix
-        String clientIdPrefix =
-                
consumerProps.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key());
-        consumerProps.setProperty(
-                ConsumerConfig.CLIENT_ID_CONFIG, clientIdPrefix + 
"-enumerator-consumer");
-        consumerProps.setProperty(
-                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                ByteArrayDeserializer.class.getName());
-        consumerProps.setProperty(
-                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                ByteArrayDeserializer.class.getName());
-        // Disable auto topic creation.
-        
consumerProps.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, 
"false");
-        return new KafkaConsumer<>(consumerProps);
-    }
-
     private AdminClient getKafkaAdminClient() {
         Properties adminClientProps = new Properties();
         deepCopyProperties(properties, adminClientProps);
@@ -434,7 +412,7 @@ public class KafkaSourceEnumerator
 
     private OffsetsInitializer.PartitionOffsetsRetriever getOffsetsRetriever() 
{
         String groupId = 
properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
-        return new PartitionOffsetsRetrieverImpl(consumer, adminClient, 
groupId);
+        return new PartitionOffsetsRetrieverImpl(adminClient, groupId);
     }
 
     /**
@@ -514,13 +492,10 @@ public class KafkaSourceEnumerator
     @VisibleForTesting
     public static class PartitionOffsetsRetrieverImpl
             implements OffsetsInitializer.PartitionOffsetsRetriever, 
AutoCloseable {
-        private final KafkaConsumer<?, ?> consumer;
         private final AdminClient adminClient;
         private final String groupId;
 
-        public PartitionOffsetsRetrieverImpl(
-                KafkaConsumer<?, ?> consumer, AdminClient adminClient, String 
groupId) {
-            this.consumer = consumer;
+        public PartitionOffsetsRetrieverImpl(AdminClient adminClient, String 
groupId) {
             this.adminClient = adminClient;
             this.groupId = groupId;
         }
@@ -547,6 +522,7 @@ public class KafkaSourceEnumerator
                                 })
                         .get();
             } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 throw new FlinkRuntimeException(
                         "Interrupted while listing offsets for consumer group 
" + groupId, e);
             } catch (ExecutionException e) {
@@ -558,25 +534,96 @@ public class KafkaSourceEnumerator
             }
         }
 
+        /**
+         * List offsets for the specified partitions and OffsetSpec. This 
operation enables to find
+         * the beginning offset, end offset as well as the offset matching a 
timestamp in
+         * partitions.
+         *
+         * @see KafkaAdminClient#listOffsets(Map)
+         * @param topicPartitionOffsets The mapping from partition to the 
OffsetSpec to look up.
+         * @return The list offsets result.
+         */
+        private Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
listOffsets(
+                Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
+            try {
+                return adminClient
+                        .listOffsets(topicPartitionOffsets)
+                        .all()
+                        .thenApply(
+                                result -> {
+                                    Map<TopicPartition, 
ListOffsetsResult.ListOffsetsResultInfo>
+                                            offsets = new HashMap<>();
+                                    result.forEach(
+                                            (tp, listOffsetsResultInfo) -> {
+                                                if (listOffsetsResultInfo != 
null) {
+                                                    offsets.put(tp, 
listOffsetsResultInfo);
+                                                }
+                                            });
+                                    return offsets;
+                                })
+                        .get();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new FlinkRuntimeException(
+                        "Interrupted while listing offsets for topic 
partitions: "
+                                + topicPartitionOffsets,
+                        e);
+            } catch (ExecutionException e) {
+                throw new FlinkRuntimeException(
+                        "Failed to list offsets for topic partitions: "
+                                + topicPartitionOffsets
+                                + " due to",
+                        e);
+            }
+        }
+
+        private Map<TopicPartition, Long> listOffsets(
+                Collection<TopicPartition> partitions, OffsetSpec offsetSpec) {
+            return listOffsets(
+                            partitions.stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    partition -> partition, __ 
-> offsetSpec)))
+                    .entrySet().stream()
+                    .collect(
+                            Collectors.toMap(
+                                    Map.Entry::getKey, entry -> 
entry.getValue().offset()));
+        }
+
         @Override
         public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> 
partitions) {
-            return consumer.endOffsets(partitions);
+            return listOffsets(partitions, OffsetSpec.latest());
         }
 
         @Override
         public Map<TopicPartition, Long> 
beginningOffsets(Collection<TopicPartition> partitions) {
-            return consumer.beginningOffsets(partitions);
+            return listOffsets(partitions, OffsetSpec.earliest());
         }
 
         @Override
         public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
                 Map<TopicPartition, Long> timestampsToSearch) {
-            return consumer.offsetsForTimes(timestampsToSearch);
+            return listOffsets(
+                            timestampsToSearch.entrySet().stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    Map.Entry::getKey,
+                                                    entry ->
+                                                            
OffsetSpec.forTimestamp(
+                                                                    
entry.getValue()))))
+                    .entrySet().stream()
+                    .collect(
+                            Collectors.toMap(
+                                    Map.Entry::getKey,
+                                    entry ->
+                                            new OffsetAndTimestamp(
+                                                    entry.getValue().offset(),
+                                                    
entry.getValue().timestamp(),
+                                                    
entry.getValue().leaderEpoch())));
         }
 
         @Override
         public void close() throws Exception {
-            consumer.close(Duration.ZERO);
             adminClient.close(Duration.ZERO);
         }
     }
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
index 9774cb8..6de272d 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 
 import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
@@ -34,7 +33,8 @@ import java.util.Collection;
 import java.util.Map;
 
 /**
- * A interface for users to specify the starting / stopping offset of a {@link 
KafkaPartitionSplit}.
+ * An interface for users to specify the starting / stopping offset of a {@link
+ * KafkaPartitionSplit}.
  *
  * @see ReaderHandledOffsetsInitializer
  * @see SpecifiedOffsetsInitializer
@@ -85,13 +85,13 @@ public interface OffsetsInitializer extends Serializable {
          */
         Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> 
partitions);
 
-        /** @see KafkaConsumer#endOffsets(Collection) */
+        /** List end offsets for the specified partitions. */
         Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> 
partitions);
 
-        /** @see KafkaConsumer#beginningOffsets(Collection) */
+        /** List beginning offsets for the specified partitions. */
         Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> 
partitions);
 
-        /** @see KafkaConsumer#offsetsForTimes(Map) */
+        /** List offsets matching a timestamp for the specified partitions. */
         Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
                 Map<TopicPartition, Long> timestampsToSearch);
     }
@@ -130,7 +130,7 @@ public interface OffsetsInitializer extends Serializable {
      * @param timestamp the timestamp to start the consumption.
      * @return an {@link OffsetsInitializer} which initializes the offsets 
based on the given
      *     timestamp.
-     * @see KafkaConsumer#offsetsForTimes(Map)
+     * @see KafkaAdminClient#listOffsets(Map)
      */
     static OffsetsInitializer timestamp(long timestamp) {
         return new TimestampOffsetsInitializer(timestamp);
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index 9216172..2a3200b 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.mock.Whitebox;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.junit.AfterClass;
@@ -313,15 +312,8 @@ public class KafkaEnumeratorTest {
                     defaultTimeoutMs,
                     Whitebox.getInternalState(adminClient, 
"defaultApiTimeoutMs"));
 
-            KafkaConsumer<?, ?> consumer =
-                    (KafkaConsumer<?, ?>) 
Whitebox.getInternalState(enumerator, "consumer");
-            assertNotNull(consumer);
-            clientId = (String) Whitebox.getInternalState(consumer, 
"clientId");
             assertNotNull(clientId);
             assertTrue(clientId.startsWith(clientIdPrefix));
-            assertEquals(
-                    (long) defaultTimeoutMs,
-                    Whitebox.getInternalState(consumer, "requestTimeoutMs"));
         }
     }
 
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
index 6c696f0..4b74bbe 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
@@ -51,9 +51,7 @@ public class OffsetsInitializerTest {
         KafkaSourceTestEnv.setupTopic(TOPIC2, false, false, 
KafkaSourceTestEnv::getRecordsForTopic);
         retriever =
                 new KafkaSourceEnumerator.PartitionOffsetsRetrieverImpl(
-                        KafkaSourceTestEnv.getConsumer(),
-                        KafkaSourceTestEnv.getAdminClient(),
-                        KafkaSourceTestEnv.GROUP_ID);
+                        KafkaSourceTestEnv.getAdminClient(), 
KafkaSourceTestEnv.GROUP_ID);
     }
 
     @AfterClass

Reply via email to