This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 5ac2724c83e [improve][io] Upgrade Kafka client and compatible Confluent platform version (#24201) 5ac2724c83e is described below commit 5ac2724c83e5825cf9748477de0ab7c9981ac06f Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Wed Apr 23 16:44:21 2025 +0300 [improve][io] Upgrade Kafka client and compatible Confluent platform version (#24201) (cherry picked from commit 10c65598e5be2b1c889832078b59d7ef89d10c89) --- pom.xml | 4 +-- .../io/kafka/connect/PulsarOffsetBackingStore.java | 8 +++++ .../integration/io/sinks/KafkaSinkTester.java | 6 ++-- .../io/sources/AvroKafkaSourceTest.java | 40 +++++++++++++--------- .../integration/io/sources/KafkaSourceTester.java | 4 +-- 5 files changed, 39 insertions(+), 23 deletions(-) diff --git a/pom.xml b/pom.xml index b138a512c2c..0d068cdff15 100644 --- a/pom.xml +++ b/pom.xml @@ -180,7 +180,7 @@ flexible messaging model and an intuitive client API.</description> <hbc-core.version>2.2.0</hbc-core.version> <cassandra.version>3.11.2</cassandra.version> <aerospike-client.version>4.5.0</aerospike-client.version> - <kafka-client.version>3.4.0</kafka-client.version> + <kafka-client.version>3.8.1</kafka-client.version> <rabbitmq-client.version>5.18.0</rabbitmq-client.version> <aws-sdk.version>1.12.638</aws-sdk.version> <avro.version>1.11.4</avro.version> @@ -210,7 +210,7 @@ flexible messaging model and an intuitive client API.</description> <guava.version>32.1.2-jre</guava.version> <jcip.version>1.0</jcip.version> <prometheus-jmx.version>0.16.1</prometheus-jmx.version> - <confluent.version>6.2.8</confluent.version> + <confluent.version>7.8.2</confluent.version> <aircompressor.version>0.27</aircompressor.version> <asynchttpclient.version>2.12.4</asynchttpclient.version> <commons-lang3.version>3.11</commons-lang3.version> diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java index 02f315af68f..044885b541f 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java @@ -33,6 +33,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -260,4 +261,11 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore { return Collections.emptyMap(); } } + + @Override + public Set<Map<String, Object>> connectorPartitions(String connectorName) { + // skip implementing this method which was added in Kafka for + // KIP-875: First-class offsets support in Kafka Connect + return null; + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java index d474efaadfe..dbcb1639c11 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java @@ -42,7 +42,7 @@ import org.testcontainers.utility.DockerImageName; */ @Slf4j public class KafkaSinkTester extends SinkTester<KafkaContainer> { - public static final String CONFLUENT_PLATFORM_VERSION = System.getProperty("confluent.version", "6.2.8"); + public static final String CONFLUENT_PLATFORM_VERSION = System.getProperty("confluent.version", "7.8.2"); private final String kafkaTopicName; private KafkaConsumer<String, String> kafkaConsumer; @@ -78,8 +78,8 @@ public class KafkaSinkTester extends SinkTester<KafkaContainer> { ExecResult execResult = serviceContainer.execInContainer( "/usr/bin/kafka-topics", "--create", - "--zookeeper", - "localhost:2181", + "--bootstrap-server", + "localhost:9092", "--partitions", "1", "--replication-factor", diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/AvroKafkaSourceTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/AvroKafkaSourceTest.java index 913b4e37674..c9a1233091f 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/AvroKafkaSourceTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/AvroKafkaSourceTest.java @@ -18,8 +18,20 @@ */ package org.apache.pulsar.tests.integration.io.sources; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.google.common.collect.ImmutableMap; import com.google.gson.Gson; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -29,11 +41,15 @@ import org.apache.avro.io.JsonEncoder; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.Field; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.SourceStatus; import org.apache.pulsar.common.policies.data.SourceStatusUtil; import org.apache.pulsar.tests.integration.docker.ContainerExecException; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; @@ -48,18 +64,6 @@ import org.testcontainers.images.builder.Transferable; import org.testcontainers.utility.DockerImageName; import org.testng.Assert; import org.testng.annotations.Test; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.TimeUnit; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.SourceStatus; - -import static org.testng.Assert.*; /** * A tester for testing kafka source with Avro Messages. @@ -71,7 +75,7 @@ import static org.testng.Assert.*; */ @Slf4j public class AvroKafkaSourceTest extends PulsarFunctionsTestBase { - public static final String CONFLUENT_PLATFORM_VERSION = System.getProperty("confluent.version", "6.2.8"); + public static final String CONFLUENT_PLATFORM_VERSION = System.getProperty("confluent.version", "7.8.2"); private static final String SOURCE_TYPE = "kafka"; @@ -162,8 +166,8 @@ public class AvroKafkaSourceTest extends PulsarFunctionsTestBase { ExecResult execResult = kafkaContainer.execInContainer( "/usr/bin/kafka-topics", "--create", - "--zookeeper", - getZooKeeperAddressInDockerNetwork(), + "--bootstrap-server", + getBootstrapServersOnDockerNetwork(), "--partitions", "1", "--replication-factor", @@ -393,7 +397,11 @@ public class AvroKafkaSourceTest extends PulsarFunctionsTestBase { log.info("script results: "+execResult.getStdout()); log.info("script stderr: "+execResult.getStderr()); assertTrue(execResult.getStdout().contains("Closing the Kafka producer"), execResult.getStdout()+" "+execResult.getStderr()); - assertTrue(execResult.getStderr().isEmpty(), execResult.getStderr()); + // filter out the SLF4J warnings + String stderrFiltered = execResult.getStderr() + .replaceAll("(?m)^SLF4J: .*?[\\r\\n]+", "") + .trim(); + assertTrue(stderrFiltered.isEmpty(), stderrFiltered); log.info("Successfully produced {} messages to kafka topic {}", numMessages, kafkaTopicName); return written; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java index 9eab0084091..57c4c340792 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java @@ -76,8 +76,8 @@ public class KafkaSourceTester extends SourceTester<KafkaContainer> { ExecResult execResult = kafkaContainer.execInContainer( "/usr/bin/kafka-topics", "--create", - "--zookeeper", - "localhost:2181", + "--bootstrap-server", + "localhost:9092", "--partitions", "1", "--replication-factor",