This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 5ac2724c83e [improve][io] Upgrade Kafka client and compatible 
Confluent platform version (#24201)
5ac2724c83e is described below

commit 5ac2724c83e5825cf9748477de0ab7c9981ac06f
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Wed Apr 23 16:44:21 2025 +0300

    [improve][io] Upgrade Kafka client and compatible Confluent platform 
version (#24201)
    
    (cherry picked from commit 10c65598e5be2b1c889832078b59d7ef89d10c89)
---
 pom.xml                                            |  4 +--
 .../io/kafka/connect/PulsarOffsetBackingStore.java |  8 +++++
 .../integration/io/sinks/KafkaSinkTester.java      |  6 ++--
 .../io/sources/AvroKafkaSourceTest.java            | 40 +++++++++++++---------
 .../integration/io/sources/KafkaSourceTester.java  |  4 +--
 5 files changed, 39 insertions(+), 23 deletions(-)

diff --git a/pom.xml b/pom.xml
index b138a512c2c..0d068cdff15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -180,7 +180,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <hbc-core.version>2.2.0</hbc-core.version>
     <cassandra.version>3.11.2</cassandra.version>
     <aerospike-client.version>4.5.0</aerospike-client.version>
-    <kafka-client.version>3.4.0</kafka-client.version>
+    <kafka-client.version>3.8.1</kafka-client.version>
     <rabbitmq-client.version>5.18.0</rabbitmq-client.version>
     <aws-sdk.version>1.12.638</aws-sdk.version>
     <avro.version>1.11.4</avro.version>
@@ -210,7 +210,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <guava.version>32.1.2-jre</guava.version>
     <jcip.version>1.0</jcip.version>
     <prometheus-jmx.version>0.16.1</prometheus-jmx.version>
-    <confluent.version>6.2.8</confluent.version>
+    <confluent.version>7.8.2</confluent.version>
     <aircompressor.version>0.27</aircompressor.version>
     <asynchttpclient.version>2.12.4</asynchttpclient.version>
     <commons-lang3.version>3.11</commons-lang3.version>
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index 02f315af68f..044885b541f 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -33,6 +33,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -260,4 +261,11 @@ public class PulsarOffsetBackingStore implements 
OffsetBackingStore {
             return Collections.emptyMap();
         }
     }
+
+    @Override
+    public Set<Map<String, Object>> connectorPartitions(String connectorName) {
+        // skip implementing this method which was added in Kafka for
+        // KIP-875: First-class offsets support in Kafka Connect
+        return null;
+    }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
index d474efaadfe..dbcb1639c11 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
@@ -42,7 +42,7 @@ import org.testcontainers.utility.DockerImageName;
  */
 @Slf4j
 public class KafkaSinkTester extends SinkTester<KafkaContainer> {
-    public static final String CONFLUENT_PLATFORM_VERSION = 
System.getProperty("confluent.version", "6.2.8");
+    public static final String CONFLUENT_PLATFORM_VERSION = 
System.getProperty("confluent.version", "7.8.2");
 
     private final String kafkaTopicName;
     private KafkaConsumer<String, String> kafkaConsumer;
@@ -78,8 +78,8 @@ public class KafkaSinkTester extends 
SinkTester<KafkaContainer> {
         ExecResult execResult = serviceContainer.execInContainer(
                 "/usr/bin/kafka-topics",
                 "--create",
-                "--zookeeper",
-                "localhost:2181",
+                "--bootstrap-server",
+                "localhost:9092",
                 "--partitions",
                 "1",
                 "--replication-factor",
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/AvroKafkaSourceTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/AvroKafkaSourceTest.java
index 913b4e37674..c9a1233091f 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/AvroKafkaSourceTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/AvroKafkaSourceTest.java
@@ -18,8 +18,20 @@
  */
 package org.apache.pulsar.tests.integration.io.sources;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import com.google.common.collect.ImmutableMap;
 import com.google.gson.Gson;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -29,11 +41,15 @@ import org.apache.avro.io.JsonEncoder;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.common.policies.data.SourceStatusUtil;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
@@ -48,18 +64,6 @@ import org.testcontainers.images.builder.Transferable;
 import org.testcontainers.utility.DockerImageName;
 import org.testng.Assert;
 import org.testng.annotations.Test;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.SourceStatus;
-
-import static org.testng.Assert.*;
 
 /**
  * A tester for testing kafka source with Avro Messages.
@@ -71,7 +75,7 @@ import static org.testng.Assert.*;
  */
 @Slf4j
 public class AvroKafkaSourceTest extends PulsarFunctionsTestBase {
-    public static final String CONFLUENT_PLATFORM_VERSION = 
System.getProperty("confluent.version", "6.2.8");
+    public static final String CONFLUENT_PLATFORM_VERSION = 
System.getProperty("confluent.version", "7.8.2");
 
     private static final String SOURCE_TYPE = "kafka";
 
@@ -162,8 +166,8 @@ public class AvroKafkaSourceTest extends 
PulsarFunctionsTestBase {
         ExecResult execResult = kafkaContainer.execInContainer(
             "/usr/bin/kafka-topics",
             "--create",
-            "--zookeeper",
-                getZooKeeperAddressInDockerNetwork(),
+            "--bootstrap-server",
+            getBootstrapServersOnDockerNetwork(),
             "--partitions",
             "1",
             "--replication-factor",
@@ -393,7 +397,11 @@ public class AvroKafkaSourceTest extends 
PulsarFunctionsTestBase {
         log.info("script results: "+execResult.getStdout());
         log.info("script stderr: "+execResult.getStderr());
         assertTrue(execResult.getStdout().contains("Closing the Kafka 
producer"), execResult.getStdout()+" "+execResult.getStderr());
-        assertTrue(execResult.getStderr().isEmpty(), execResult.getStderr());
+        // filter out the SLF4J warnings
+        String stderrFiltered = execResult.getStderr()
+                .replaceAll("(?m)^SLF4J: .*?[\\r\\n]+", "")
+                .trim();
+        assertTrue(stderrFiltered.isEmpty(), stderrFiltered);
 
         log.info("Successfully produced {} messages to kafka topic {}", 
numMessages, kafkaTopicName);
         return written;
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
index 9eab0084091..57c4c340792 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
@@ -76,8 +76,8 @@ public class KafkaSourceTester extends 
SourceTester<KafkaContainer> {
         ExecResult execResult = kafkaContainer.execInContainer(
             "/usr/bin/kafka-topics",
             "--create",
-            "--zookeeper",
-            "localhost:2181",
+            "--bootstrap-server",
+            "localhost:9092",
             "--partitions",
             "1",
             "--replication-factor",

Reply via email to