This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new ac98345b [FLINK-37583] Upgrade to Kafka 4.0.0 client.
ac98345b is described below

commit ac98345ba1bc94f0532feccc0712ca61f4ee185a
Author: Thomas Cooper <c...@tomcooper.dev>
AuthorDate: Fri Mar 28 11:07:36 2025 +0100

    [FLINK-37583] Upgrade to Kafka 4.0.0 client.
    
    Note that this update will make the connector incompatible with Kafka
    clusters running Kafka version 2.0 and older.
    
    Signed-off-by: Thomas Cooper <c...@tomcooper.dev>
---
 .gitignore                                                    |  1 +
 docs/content.zh/docs/connectors/datastream/kafka.md           |  2 +-
 docs/content/docs/connectors/datastream/kafka.md              |  2 +-
 flink-connector-kafka/.gitignore                              |  1 +
 .../dynamic/metadata/SingleClusterTopicMetadataService.java   |  4 ++--
 .../kafka/source/enumerator/KafkaSourceEnumerator.java        |  9 ++++-----
 .../connector/kafka/sink/KafkaWriterFaultToleranceITCase.java |  6 +++++-
 .../flink/connector/kafka/testutils/KafkaSourceTestEnv.java   | 11 ++++++-----
 .../streaming/connectors/kafka/KafkaTestEnvironmentImpl.java  |  9 +++++++--
 flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE  |  2 +-
 pom.xml                                                       |  2 +-
 tools/ci/log4j.properties                                     |  4 ++++
 12 files changed, 34 insertions(+), 19 deletions(-)

diff --git a/.gitignore b/.gitignore
index 485c27aa..43b09a54 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,6 +4,7 @@ scalastyle-output.xml
 .classpath
 .idea/*
 !.idea/vcs.xml
+.vscode
 .metadata
 .settings
 .project
diff --git a/docs/content.zh/docs/connectors/datastream/kafka.md 
b/docs/content.zh/docs/connectors/datastream/kafka.md
index 4a90ece3..a80d3371 100644
--- a/docs/content.zh/docs/connectors/datastream/kafka.md
+++ b/docs/content.zh/docs/connectors/datastream/kafka.md
@@ -33,7 +33,7 @@ Flink 提供了 [Apache Kafka](https://kafka.apache.org) 连接器使用精确
 
 Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。
 该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。
-当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。
+当前 Kafka client 向后兼容 2.1.0 或更高版本的 Kafka broker。
 有关 Kafka 兼容性的更多细节,请参考 [Kafka 
官方文档](https://kafka.apache.org/protocol.html#protocol_compatibility)。
 
 {{< connector_artifact flink-connector-kafka kafka >}}
diff --git a/docs/content/docs/connectors/datastream/kafka.md 
b/docs/content/docs/connectors/datastream/kafka.md
index bddcefec..d4b1ba22 100644
--- a/docs/content/docs/connectors/datastream/kafka.md
+++ b/docs/content/docs/connectors/datastream/kafka.md
@@ -33,7 +33,7 @@ Flink provides an [Apache Kafka](https://kafka.apache.org) 
connector for reading
 
 Apache Flink ships with a universal Kafka connector which attempts to track 
the latest version of the Kafka client.
 The version of the client it uses may change between Flink releases.
-Modern Kafka clients are backwards compatible with broker versions 0.10.0 or 
later.
+Modern Kafka clients are backwards compatible with broker versions 2.1.0 or 
later.
 For details on Kafka compatibility, please refer to the official [Kafka 
documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
 
 {{< connector_artifact flink-connector-kafka kafka >}}
diff --git a/flink-connector-kafka/.gitignore b/flink-connector-kafka/.gitignore
new file mode 100644
index 00000000..69e9a73b
--- /dev/null
+++ b/flink-connector-kafka/.gitignore
@@ -0,0 +1 @@
+/latest_offset_resume_topic*
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java
index 6cef3ab3..1f2f0fd1 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java
@@ -72,8 +72,8 @@ public class SingleClusterTopicMetadataService implements 
KafkaMetadataService {
     @Override
     public Map<String, KafkaStream> describeStreams(Collection<String> 
streamIds) {
         try {
-            return getAdminClient().describeTopics(new 
ArrayList<>(streamIds)).all().get().keySet()
-                    .stream()
+            return getAdminClient().describeTopics(new 
ArrayList<>(streamIds)).allTopicNames().get()
+                    .keySet().stream()
                     .collect(Collectors.toMap(topic -> topic, 
this::createKafkaStream));
         } catch (InterruptedException | ExecutionException e) {
             throw new RuntimeException("Fetching all streams failed", e);
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 10025fa2..f3058193 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -32,7 +32,7 @@ import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -533,12 +533,11 @@ public class KafkaSourceEnumerator
 
         @Override
         public Map<TopicPartition, Long> 
committedOffsets(Collection<TopicPartition> partitions) {
-            ListConsumerGroupOffsetsOptions options =
-                    new ListConsumerGroupOffsetsOptions()
-                            .topicPartitions(new ArrayList<>(partitions));
+            ListConsumerGroupOffsetsSpec offsetsSpec =
+                    new 
ListConsumerGroupOffsetsSpec().topicPartitions(partitions);
             try {
                 return adminClient
-                        .listConsumerGroupOffsets(groupId, options)
+                        
.listConsumerGroupOffsets(Collections.singletonMap(groupId, offsetsSpec))
                         .partitionsToOffsetAndMetadata()
                         .thenApply(
                                 result -> {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java
index 4a7d25c6..86fc5846 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java
@@ -40,7 +40,11 @@ public class KafkaWriterFaultToleranceITCase extends 
KafkaWriterTestBase {
     private static final String INIT_KAFKA_RETRIES = "0";
     private static final String INIT_KAFKA_REQUEST_TIMEOUT_MS = "1000";
     private static final String INIT_KAFKA_MAX_BLOCK_MS = "1000";
-    private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "1000";
+    /**
+     * The delivery timeout has to be greater than the request timeout as the 
latter is part of the
+     * former and this is enforced by a compile time check.
+     */
+    private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "1500";
 
     @RegisterExtension
     public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java
index d82425f5..ed4a10ef 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
 
 import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -238,10 +239,10 @@ public class KafkaSourceTestEnv extends KafkaTestBase {
         Map<TopicPartition, OffsetAndMetadata> toVerify =
                 adminClient
                         .listConsumerGroupOffsets(
-                                GROUP_ID,
-                                new ListConsumerGroupOffsetsOptions()
-                                        .topicPartitions(
-                                                new 
ArrayList<>(committedOffsets.keySet())))
+                                Collections.singletonMap(
+                                        GROUP_ID,
+                                        new ListConsumerGroupOffsetsSpec()
+                                                
.topicPartitions(committedOffsets.keySet())))
                         .partitionsToOffsetAndMetadata()
                         .get();
         assertThat(toVerify).as("The offsets are not 
committed").isEqualTo(committedOffsets);
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 0a65e912..9c360467 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -282,8 +282,13 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
 
         @Override
         public Long getCommittedOffset(String topicName, int partition) {
-            OffsetAndMetadata committed =
-                    offsetClient.committed(new TopicPartition(topicName, 
partition));
+            TopicPartition topicPartition = new TopicPartition(topicName, 
partition);
+            Map<TopicPartition, OffsetAndMetadata> committedMap =
+                    
offsetClient.committed(Collections.singleton(topicPartition));
+            if (committedMap == null) {
+                return null;
+            }
+            OffsetAndMetadata committed = committedMap.get(topicPartition);
             return (committed != null) ? committed.offset() : null;
         }
 
diff --git a/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE 
b/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE
index 7516ca34..75ecac95 100644
--- a/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE
+++ b/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE
@@ -6,4 +6,4 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- org.apache.kafka:kafka-clients:3.9.1
+- org.apache.kafka:kafka-clients:4.0.0
diff --git a/pom.xml b/pom.xml
index a14d8ec7..7421fe2b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@ under the License.
                <!-- Main Dependencies -->
                <confluent.version>7.9.2</confluent.version>
                <flink.version>2.0.0</flink.version>
-               <kafka.version>3.9.1</kafka.version>
+               <kafka.version>4.0.0</kafka.version>
 
                <!-- Other Dependencies -->
                <avro.version>1.12.0</avro.version>
diff --git a/tools/ci/log4j.properties b/tools/ci/log4j.properties
index 25ef1cf9..d518a5f0 100644
--- a/tools/ci/log4j.properties
+++ b/tools/ci/log4j.properties
@@ -44,6 +44,10 @@ logger.flink.level = WARN
 logger.flinkconnector.name = org.apache.flink.connector
 logger.flinkconnector.level = INFO
 
+# Allow the LicenseChecker to log its output
+logger.licensechecker.name = org.apache.flink.tools.ci.licensecheck
+logger.licensechecker.level = INFO
+
 # Kafka producer and consumer level
 logger.kafka.name = org.apache.kafka
 logger.kafka.level = OFF

Reply via email to