This is an automated email from the ASF dual-hosted git repository.

FrankChen021 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new eb34e2a8ef1 build(deps): Kafka client 4.2.0 upgrade (#19441)
eb34e2a8ef1 is described below

commit eb34e2a8ef19c11474cf547d1712e7719bae62e2
Author: Shekhar Prasad Rajak <[email protected]>
AuthorDate: Wed May 13 07:47:14 2026 +0530

    build(deps): Kafka client 4.2.0 upgrade (#19441)
    
    * Bump Kafka client to 4.2.0 (#19322)
    
    * Migrate Kafka Consumer.poll(long) to poll(Duration) for Kafka 4.x
    
    * Add EmbeddedKafkaBroker test helper using Testcontainers
    
    * Migrate kafka-indexing-service tests to EmbeddedKafkaBroker; drop 
TestBroker
    
    * Migrate TestKafkaExtractionCluster to Testcontainers; bump licenses
    
    * Drop unused throws Exception from setupClass in Kafka tests
    
    * Add curator-test test dep to kafka-indexing-service for embedded ZK
---
 extensions-core/kafka-extraction-namespace/pom.xml |  24 ++--
 .../query/lookup/KafkaLookupExtractorFactory.java  |   3 +-
 .../query/lookup/TestKafkaExtractionCluster.java   |  60 ++------
 extensions-core/kafka-indexing-service/pom.xml     |  16 +--
 .../kafkainput/KafkaStringHeaderFormatTest.java    |  19 +--
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  21 +--
 .../indexing/kafka/KafkaRecordSupplierTest.java    |  24 +---
 .../druid/indexing/kafka/KafkaSamplerSpecTest.java |  16 +--
 .../indexing/kafka/simulate/KafkaResource.java     |   2 +-
 .../kafka/supervisor/KafkaSupervisorTest.java      |  33 ++---
 .../indexing/kafka/test/EmbeddedKafkaBroker.java   | 118 ++++++++++++++++
 .../druid/indexing/kafka/test/TestBroker.java      | 152 ---------------------
 licenses.yaml                                      |  15 +-
 pom.xml                                            |   2 +-
 14 files changed, 181 insertions(+), 324 deletions(-)

diff --git a/extensions-core/kafka-extraction-namespace/pom.xml 
b/extensions-core/kafka-extraction-namespace/pom.xml
index ff45e8bdfdf..89655f33099 100644
--- a/extensions-core/kafka-extraction-namespace/pom.xml
+++ b/extensions-core/kafka-extraction-namespace/pom.xml
@@ -114,17 +114,6 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.curator</groupId>
-      <artifactId>curator-test</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.13</artifactId>
-      <version>${apache.kafka.version}</version>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>mx4j</groupId>
       <artifactId>mx4j-tools</artifactId>
@@ -136,10 +125,15 @@
       <artifactId>easymock</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency><!-- Required by dependency check, used by kafka_2.13 -->
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-      <version>${scala.library.version}</version>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers-kafka</artifactId>
+      <version>${testcontainers.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>
diff --git 
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
 
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
index e03c96fa318..041a4cecd71 100644
--- 
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
+++ 
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
@@ -48,6 +48,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.validation.constraints.Min;
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
@@ -181,7 +182,7 @@ public class KafkaLookupExtractorFactory implements 
LookupExtractorFactory
               if (executorService.isShutdown()) {
                 break;
               }
-              final ConsumerRecords<String, String> records = 
consumer.poll(1000);
+              final ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));
               startingReads.countDown();
 
               for (final ConsumerRecord<String, String> record : records) {
diff --git 
a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
 
b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
index 26f6ed0f2f4..417166f7b31 100644
--- 
a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
+++ 
b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
@@ -26,9 +26,6 @@ import com.google.inject.Binder;
 import com.google.inject.Injector;
 import com.google.inject.Module;
 import com.google.inject.name.Names;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import org.apache.curator.test.TestingCluster;
 import org.apache.druid.guice.GuiceInjectors;
 import org.apache.druid.initialization.Initialization;
 import org.apache.druid.java.util.common.ISE;
@@ -42,7 +39,6 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.Time;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.MatcherAssert;
 import org.junit.After;
@@ -51,16 +47,14 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import scala.Some;
+import org.testcontainers.kafka.KafkaContainer;
 
 import javax.annotation.Nonnull;
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ThreadLocalRandom;
 
 import static org.junit.Assert.assertThrows;
 
@@ -78,8 +72,10 @@ public class TestKafkaExtractionCluster
 
   private final Closer closer = Closer.create();
 
-  private TestingCluster zkServer;
-  private KafkaServer kafkaServer;
+  private static final String KAFKA_IMAGE =
+      System.getProperty("druid.testing.kafka.image", "apache/kafka:4.2.0");
+
+  private KafkaContainer kafkaServer;
   private Injector injector;
   private ObjectMapper mapper;
   private KafkaLookupExtractorFactory factory;
@@ -95,23 +91,9 @@ public class TestKafkaExtractionCluster
   @Before
   public void setUp() throws Exception
   {
-    zkServer = new TestingCluster(1);
-    zkServer.start();
-    closer.register(() -> {
-      zkServer.stop();
-    });
-
-    kafkaServer = new KafkaServer(
-          getBrokerProperties(),
-          Time.SYSTEM,
-          Some.apply(StringUtils.format("TestingBroker[%d]-", 1)),
-          false);
-
-    kafkaServer.startup();
-    closer.register(() -> {
-      kafkaServer.shutdown();
-      kafkaServer.awaitShutdown();
-    });
+    kafkaServer = new KafkaContainer(KAFKA_IMAGE);
+    kafkaServer.start();
+    closer.register(() -> kafkaServer.stop());
     log.info("---------------------------Started Kafka Broker 
---------------------------");
 
     log.info("---------------------------Publish Messages to 
topic-----------------------");
@@ -163,8 +145,7 @@ public class TestKafkaExtractionCluster
   private Map<String, String> getConsumerProperties()
   {
     final Map<String, String> props = new HashMap<>(KAFKA_PROPERTIES);
-    int port = kafkaServer.advertisedListeners().apply(0).port();
-    props.put("bootstrap.servers", StringUtils.format("127.0.0.1:%d", port));
+    props.put("bootstrap.servers", kafkaServer.getBootstrapServers());
     return props;
   }
 
@@ -177,26 +158,6 @@ public class TestKafkaExtractionCluster
     }
   }
 
-  @Nonnull
-  private KafkaConfig getBrokerProperties() throws IOException
-  {
-    final Properties serverProperties = new Properties();
-    serverProperties.putAll(KAFKA_PROPERTIES);
-    serverProperties.put("broker.id", "0");
-    serverProperties.put("zookeeper.connect", zkServer.getConnectString());
-    serverProperties.put("port", 
String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000));
-    serverProperties.put("auto.create.topics.enable", "true");
-    serverProperties.put("log.dir", 
temporaryFolder.newFolder().getAbsolutePath());
-    serverProperties.put("num.partitions", "1");
-    serverProperties.put("offsets.topic.replication.factor", "1");
-    serverProperties.put("default.replication.factor", "1");
-    serverProperties.put("log.cleaner.enable", "true");
-    serverProperties.put("advertised.host.name", "localhost");
-    serverProperties.put("zookeeper.session.timeout.ms", "30000");
-    serverProperties.put("zookeeper.sync.time.ms", "200");
-    return new KafkaConfig(serverProperties);
-  }
-
   @After
   public void tearDown() throws Exception
   {
@@ -207,8 +168,7 @@ public class TestKafkaExtractionCluster
   {
     final Properties kafkaProducerProperties = new Properties();
     kafkaProducerProperties.putAll(KAFKA_PROPERTIES);
-    int port = kafkaServer.advertisedListeners().apply(0).port();
-    kafkaProducerProperties.put("bootstrap.servers", 
StringUtils.format("127.0.0.1:%d", port));
+    kafkaProducerProperties.put("bootstrap.servers", 
kafkaServer.getBootstrapServers());
     kafkaProducerProperties.put("key.serializer", 
ByteArraySerializer.class.getName());
     kafkaProducerProperties.put("value.serializer", 
ByteArraySerializer.class.getName());
     kafkaProducerProperties.put("acks", "all");
diff --git a/extensions-core/kafka-indexing-service/pom.xml 
b/extensions-core/kafka-indexing-service/pom.xml
index 2acfc2877d7..f86138c8742 100644
--- a/extensions-core/kafka-indexing-service/pom.xml
+++ b/extensions-core/kafka-indexing-service/pom.xml
@@ -151,9 +151,8 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.13</artifactId>
-      <version>${apache.kafka.version}</version>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>
@@ -190,22 +189,11 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.curator</groupId>
-      <artifactId>curator-test</artifactId>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.easymock</groupId>
       <artifactId>easymock</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency><!-- Required by dependency check, used by kafka_2.13 -->
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-      <version>${scala.library.version}</version>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.assertj</groupId>
       <artifactId>assertj-core</artifactId>
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaStringHeaderFormatTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaStringHeaderFormatTest.java
index 913c1aca1ad..bea3b8de0f5 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaStringHeaderFormatTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaStringHeaderFormatTest.java
@@ -36,6 +36,7 @@ import org.junit.Test;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 
 
 public class KafkaStringHeaderFormatTest
@@ -100,10 +101,10 @@ public class KafkaStringHeaderFormatTest
   {
     String headerLabelPrefix = "test.kafka.header.";
     Headers headers = new RecordHeaders(SAMPLE_HEADERS);
-    inputEntity = new KafkaRecordEntity(new ConsumerRecord<>(
+    inputEntity = new KafkaRecordEntity(new ConsumerRecord<byte[], byte[]>(
         "sample", 0, 0, timestamp,
-        null, null, 0, 0,
-        null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers
+        null, 0, 0,
+        null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers, 
Optional.empty()
     ));
     List<Pair<String, Object>> expectedResults = Arrays.asList(
         Pair.of("test.kafka.header.encoding", "application/json"),
@@ -151,10 +152,10 @@ public class KafkaStringHeaderFormatTest
 
     String headerLabelPrefix = "test.kafka.header.";
     Headers headers = new RecordHeaders(header);
-    inputEntity = new KafkaRecordEntity(new ConsumerRecord<>(
+    inputEntity = new KafkaRecordEntity(new ConsumerRecord<byte[], byte[]>(
         "sample", 0, 0, timestamp,
-        null, null, 0, 0,
-        null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers
+        null, 0, 0,
+        null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers, 
Optional.empty()
     ));
     List<Pair<String, Object>> expectedResults = Arrays.asList(
         Pair.of("test.kafka.header.encoding", "application/json"),
@@ -203,10 +204,10 @@ public class KafkaStringHeaderFormatTest
 
     String headerLabelPrefix = "test.kafka.header.";
     Headers headers = new RecordHeaders(header);
-    inputEntity = new KafkaRecordEntity(new ConsumerRecord<>(
+    inputEntity = new KafkaRecordEntity(new ConsumerRecord<byte[], byte[]>(
         "sample", 0, 0, timestamp,
-        null, null, 0, 0,
-        null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers
+        null, 0, 0,
+        null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers, 
Optional.empty()
     ));
     List<Pair<String, Object>> expectedResults = Arrays.asList(
         Pair.of("test.kafka.header.encoding", "?pplic?tion/json"),
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index e3d98588d0a..bb40602be7b 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -37,7 +37,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Injector;
 import org.apache.commons.io.FileUtils;
-import org.apache.curator.test.TestingCluster;
 import org.apache.druid.cli.CliPeon;
 import org.apache.druid.cli.CliPeonTest;
 import org.apache.druid.cli.PeonLoadSpecHolder;
@@ -73,7 +72,7 @@ import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
-import org.apache.druid.indexing.kafka.test.TestBroker;
+import org.apache.druid.indexing.kafka.test.EmbeddedKafkaBroker;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
@@ -198,8 +197,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
       "kafka.testheader.", "kafka.key", "kafka.timestamp", "kafka.topic", 
"kafka.partition", "kafka.offset"
   );
 
-  private static TestingCluster zkServer;
-  private static TestBroker kafkaServer;
+  private static EmbeddedKafkaBroker kafkaServer;
   private static int topicPostfix;
   static final Module TEST_MODULE = new 
SimpleModule("kafkaTestModule").registerSubtypes(
       new NamedType(TestKafkaInputFormat.class, "testKafkaInputFormat"),
@@ -288,17 +286,9 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
   }
 
   @BeforeClass
-  public static void setupClass() throws Exception
+  public static void setupClass()
   {
-    zkServer = new TestingCluster(1);
-    zkServer.start();
-
-    kafkaServer = new TestBroker(
-        zkServer.getConnectString(),
-        null,
-        1,
-        ImmutableMap.of("num.partitions", "2")
-    );
+    kafkaServer = new 
EmbeddedKafkaBroker(ImmutableMap.of("KAFKA_NUM_PARTITIONS", "2"));
     kafkaServer.start();
 
     taskExec = MoreExecutors.listeningDecorator(
@@ -345,9 +335,6 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     kafkaServer.close();
     kafkaServer = null;
-
-    zkServer.stop();
-    zkServer = null;
   }
 
   @Test(timeout = 60_000L)
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index 6b1b51474c7..a01ae639c04 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -24,11 +24,10 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
-import org.apache.curator.test.TestingCluster;
 import org.apache.druid.data.input.kafka.KafkaRecordEntity;
 import org.apache.druid.data.input.kafka.KafkaTopicPartition;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
-import org.apache.druid.indexing.kafka.test.TestBroker;
+import org.apache.druid.indexing.kafka.test.EmbeddedKafkaBroker;
 import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.common.StringUtils;
@@ -79,8 +78,7 @@ public class KafkaRecordSupplierTest
 
   private static String TOPIC = "topic";
   private static int TOPIC_POS_FIX = 0;
-  private static TestingCluster ZK_SERVER;
-  private static TestBroker KAFKA_SERVER;
+  private static EmbeddedKafkaBroker KAFKA_SERVER;
 
   private List<ProducerRecord<byte[], byte[]>> records;
 
@@ -199,19 +197,10 @@ public class KafkaRecordSupplierTest
   }
 
   @BeforeClass
-  public static void setupClass() throws Exception
+  public static void setupClass()
   {
-    ZK_SERVER = new TestingCluster(1);
-    ZK_SERVER.start();
-
-    KAFKA_SERVER = new TestBroker(
-        ZK_SERVER.getConnectString(),
-        null,
-        1,
-        ImmutableMap.of("num.partitions", "2")
-    );
+    KAFKA_SERVER = new 
EmbeddedKafkaBroker(ImmutableMap.of("KAFKA_NUM_PARTITIONS", "2"));
     KAFKA_SERVER.start();
-
   }
 
   @Before
@@ -222,13 +211,10 @@ public class KafkaRecordSupplierTest
   }
 
   @AfterClass
-  public static void tearDownClass() throws Exception
+  public static void tearDownClass()
   {
     KAFKA_SERVER.close();
     KAFKA_SERVER = null;
-
-    ZK_SERVER.stop();
-    ZK_SERVER = null;
   }
 
   @Test
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index dc9c7186815..444f10013e1 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.kafka;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import org.apache.curator.test.TestingCluster;
 import org.apache.druid.client.indexing.SamplerResponse;
 import org.apache.druid.client.indexing.SamplerSpec;
 import org.apache.druid.data.input.impl.FloatDimensionSchema;
@@ -33,7 +32,7 @@ import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.granularity.UniformGranularitySpec;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
-import org.apache.druid.indexing.kafka.test.TestBroker;
+import org.apache.druid.indexing.kafka.test.EmbeddedKafkaBroker;
 import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
 import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
 import org.apache.druid.indexing.overlord.sampler.SamplerException;
@@ -98,8 +97,7 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
         schema.withTimestamp(new TimestampSpec("kafka.timestamp", "iso", 
null));
       };
 
-  private static TestingCluster zkServer;
-  private static TestBroker kafkaServer;
+  private static EmbeddedKafkaBroker kafkaServer;
 
   private static List<ProducerRecord<byte[], byte[]>> generateRecords(String 
topic)
   {
@@ -114,20 +112,16 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
   }
 
   @BeforeClass
-  public static void setupClass() throws Exception
+  public static void setupClass()
   {
-    zkServer = new TestingCluster(1);
-    zkServer.start();
-
-    kafkaServer = new TestBroker(zkServer.getConnectString(), null, 1, 
ImmutableMap.of("num.partitions", "2"));
+    kafkaServer = new 
EmbeddedKafkaBroker(ImmutableMap.of("KAFKA_NUM_PARTITIONS", "2"));
     kafkaServer.start();
   }
 
   @AfterClass
-  public static void tearDownClass() throws Exception
+  public static void tearDownClass()
   {
     kafkaServer.close();
-    zkServer.stop();
   }
 
   @Test
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
index 2df7c6a7be1..27c5fcb65bd 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
@@ -54,7 +54,7 @@ public class KafkaResource extends 
StreamIngestResource<KafkaContainer>
    * defaults to {@code apache/kafka}. Environments that cannot run that
    * image should set the system property to {@code apache/kafka-native}.
    */
-  private static final String KAFKA_IMAGE = 
System.getProperty("druid.testing.kafka.image", "apache/kafka:4.1.1");
+  private static final String KAFKA_IMAGE = 
System.getProperty("druid.testing.kafka.image", "apache/kafka:4.2.0");
 
   private EmbeddedDruidCluster cluster;
 
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index c629aa46192..993582693cb 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.curator.test.TestingCluster;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.JsonInputFormat;
@@ -50,7 +49,7 @@ import org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskRunner;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
 import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
-import org.apache.druid.indexing.kafka.test.TestBroker;
+import org.apache.druid.indexing.kafka.test.EmbeddedKafkaBroker;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
@@ -122,7 +121,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import javax.annotation.Nullable;
-import java.io.IOException;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
@@ -160,8 +158,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S");
   private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S");
 
-  private static TestingCluster zkServer;
-  private static TestBroker kafkaServer;
+  private static EmbeddedKafkaBroker kafkaServer;
   private static String kafkaHost;
   private static DataSchema dataSchema;
   private static int topicPostfix;
@@ -207,24 +204,21 @@ public class KafkaSupervisorTest extends EasyMockSupport
   }
 
   @BeforeClass
-  public static void setupClass() throws Exception
+  public static void setupClass()
   {
-    zkServer = new TestingCluster(1);
-    zkServer.start();
-
-    kafkaServer = new TestBroker(
-        zkServer.getConnectString(),
-        null,
-        1,
+    kafkaServer = new EmbeddedKafkaBroker(
         ImmutableMap.of(
-            "num.partitions",
+            "KAFKA_NUM_PARTITIONS",
             String.valueOf(NUM_PARTITIONS),
-            "auto.create.topics.enable",
-            String.valueOf(false)
+            "KAFKA_AUTO_CREATE_TOPICS_ENABLE",
+            String.valueOf(false),
+            // Kafka 4.x defaults log.message.timestamp.after.max.ms to 1h; 
addSomeEvents emits future timestamps.
+            "KAFKA_LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS",
+            String.valueOf(Long.MAX_VALUE)
         )
     );
     kafkaServer.start();
-    kafkaHost = StringUtils.format("localhost:%d", kafkaServer.getPort());
+    kafkaHost = kafkaServer.getBootstrapServerUrl();
 
     dataSchema = getDataSchema(DATASOURCE);
   }
@@ -258,13 +252,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
   }
 
   @AfterClass
-  public static void tearDownClass() throws IOException
+  public static void tearDownClass()
   {
     kafkaServer.close();
     kafkaServer = null;
-
-    zkServer.stop();
-    zkServer = null;
   }
 
   @Test
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/EmbeddedKafkaBroker.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/EmbeddedKafkaBroker.java
new file mode 100644
index 00000000000..c223337decb
--- /dev/null
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/EmbeddedKafkaBroker.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka.test;
+
+import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.testcontainers.kafka.KafkaContainer;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Embedded Kafka broker for unit tests, backed by Testcontainers {@link 
KafkaContainer}.
+ */
+public class EmbeddedKafkaBroker implements Closeable
+{
+  private static final String KAFKA_IMAGE =
+      System.getProperty("druid.testing.kafka.image", "apache/kafka:4.2.0");
+
+  private final KafkaContainer container;
+
+  public EmbeddedKafkaBroker()
+  {
+    this(Map.of());
+  }
+
+  /**
+   * @param brokerEnv extra env vars forwarded to the container; the 
apache/kafka
+   *                  image maps {@code KAFKA_FOO_BAR} to broker config {@code 
foo.bar}.
+   */
+  public EmbeddedKafkaBroker(Map<String, String> brokerEnv)
+  {
+    this.container = new KafkaContainer(KAFKA_IMAGE);
+    if (brokerEnv != null && !brokerEnv.isEmpty()) {
+      container.withEnv(brokerEnv);
+    }
+  }
+
+  public void start()
+  {
+    container.start();
+  }
+
+  public String getBootstrapServerUrl()
+  {
+    return container.getBootstrapServers();
+  }
+
+  public int getPort()
+  {
+    final String url = getBootstrapServerUrl();
+    final int colon = url.lastIndexOf(':');
+    return Integer.parseInt(url.substring(colon + 1));
+  }
+
+  public Map<String, Object> producerProperties()
+  {
+    final Map<String, Object> props = new HashMap<>();
+    commonClientProperties(props);
+    props.put("key.serializer", ByteArraySerializer.class.getName());
+    props.put("value.serializer", ByteArraySerializer.class.getName());
+    props.put("acks", "all");
+    props.put("enable.idempotence", "true");
+    props.put("transactional.id", 
String.valueOf(ThreadLocalRandom.current().nextInt()));
+    return props;
+  }
+
+  public KafkaProducer<byte[], byte[]> newProducer()
+  {
+    return new KafkaProducer<>(producerProperties());
+  }
+
+  public Map<String, Object> consumerProperties()
+  {
+    final Map<String, Object> props = new 
HashMap<>(KafkaConsumerConfigs.getConsumerProperties());
+    props.put("bootstrap.servers", getBootstrapServerUrl());
+    return props;
+  }
+
+  public Admin newAdminClient()
+  {
+    final Map<String, Object> props = new HashMap<>();
+    commonClientProperties(props);
+    return Admin.create(props);
+  }
+
+  private void commonClientProperties(Map<String, Object> props)
+  {
+    props.put("bootstrap.servers", getBootstrapServerUrl());
+  }
+
+  @Override
+  public void close()
+  {
+    container.stop();
+  }
+}
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
deleted file mode 100644
index 387bba88d48..00000000000
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.kafka.test;
-
-import com.google.common.collect.ImmutableMap;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
-import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.Time;
-import scala.Some;
-
-import javax.annotation.Nullable;
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-public class TestBroker implements Closeable
-{
-  private static final Random RANDOM = ThreadLocalRandom.current();
-  private final String zookeeperConnect;
-  private final File directory;
-  private final boolean directoryCleanup;
-  private final int id;
-  private final Map<String, Object> brokerProps;
-
-  private volatile KafkaServer server;
-
-  public TestBroker(
-      String zookeeperConnect,
-      @Nullable File directory,
-      int id,
-      Map<String, Object> brokerProps
-  )
-  {
-    this.zookeeperConnect = zookeeperConnect;
-    this.directory = directory == null ? FileUtils.createTempDir() : directory;
-    this.directoryCleanup = directory == null;
-    this.id = id;
-    this.brokerProps = brokerProps == null ? ImmutableMap.of() : brokerProps;
-  }
-
-  public void start()
-  {
-    final Properties props = new Properties();
-    props.setProperty("zookeeper.connect", zookeeperConnect);
-    props.setProperty("zookeeper.session.timeout.ms", "30000");
-    props.setProperty("zookeeper.connection.timeout.ms", "30000");
-    props.setProperty("log.dirs", directory.toString());
-    props.setProperty("broker.id", String.valueOf(id));
-    props.setProperty("port", 
String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000));
-    props.setProperty("advertised.host.name", "localhost");
-    props.setProperty("transaction.state.log.replication.factor", "1");
-    props.setProperty("offsets.topic.replication.factor", "1");
-    props.setProperty("transaction.state.log.min.isr", "1");
-    props.putAll(brokerProps);
-
-    final KafkaConfig config = new KafkaConfig(props);
-
-    server = new KafkaServer(
-        config,
-        Time.SYSTEM,
-        Some.apply(StringUtils.format("TestingBroker[%d]-", id)),
-        false
-    );
-    server.startup();
-  }
-
-  public int getPort()
-  {
-    return server.advertisedListeners().apply(0).port();
-  }
-
-  public KafkaProducer<byte[], byte[]> newProducer()
-  {
-    return new KafkaProducer<>(producerProperties());
-  }
-
-  public Admin newAdminClient()
-  {
-    return Admin.create(adminClientProperties());
-  }
-
-  Map<String, Object> adminClientProperties()
-  {
-    final Map<String, Object> props = new HashMap<>();
-    commonClientProperties(props);
-    return props;
-  }
-
-  public Map<String, Object> producerProperties()
-  {
-    final Map<String, Object> props = new HashMap<>();
-    commonClientProperties(props);
-    props.put("key.serializer", ByteArraySerializer.class.getName());
-    props.put("value.serializer", ByteArraySerializer.class.getName());
-    props.put("acks", "all");
-    props.put("enable.idempotence", "true");
-    props.put("transactional.id", String.valueOf(RANDOM.nextInt()));
-    return props;
-  }
-
-  void commonClientProperties(Map<String, Object> props)
-  {
-    props.put("bootstrap.servers", StringUtils.format("localhost:%d", 
getPort()));
-  }
-
-  public Map<String, Object> consumerProperties()
-  {
-    final Map<String, Object> props = 
KafkaConsumerConfigs.getConsumerProperties();
-    props.put("bootstrap.servers", StringUtils.format("localhost:%d", 
getPort()));
-    return props;
-  }
-
-  @Override
-  public void close() throws IOException
-  {
-    if (server != null) {
-      server.shutdown();
-      server.awaitShutdown();
-    }
-    if (directoryCleanup) {
-      FileUtils.deleteDirectory(directory);
-    }
-  }
-}
diff --git a/licenses.yaml b/licenses.yaml
index 3ec887ec5c4..3b76c7490b4 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -3414,7 +3414,7 @@ libraries:
 ---
 
 name: Apache Kafka
-version: 3.9.2
+version: 4.2.0
 license_category: binary
 module: extensions/druid-kafka-indexing-service
 license_name: Apache License version 2.0
@@ -4418,7 +4418,7 @@ name: Apache Kafka
 license_category: binary
 module: extensions/kafka-extraction-namespace
 license_name: Apache License version 2.0
-version: 3.6.1
+version: 4.2.0
 libraries:
   - org.apache.kafka: kafka-clients
 notices:
@@ -4449,17 +4449,6 @@ notices:
 
 ---
 
-name: Scala Library
-license_category: binary
-module: extensions/kafka-extraction-namespace
-license_name: Apache License version 2.0
-copyright: LAMP/EPFL and Lightbend, Inc.
-version: 2.13.16
-libraries:
-  - org.scala-lang: scala-library
-
----
-
 name: Microsoft Azure SDK For Key Vault Core
 license_category: binary
 module: extensions/druid-azure-extensions
diff --git a/pom.xml b/pom.xml
index 7c3752b3181..bf1da17afd6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,7 @@
         <project.build.resourceEncoding>UTF-8</project.build.resourceEncoding>
         <aether.version>0.9.0.M2</aether.version>
         <apache.curator.version>5.8.0</apache.curator.version>
-        <apache.kafka.version>3.9.2</apache.kafka.version>
+        <apache.kafka.version>4.2.0</apache.kafka.version>
         <!-- when updating apache ranger, verify the usage of aws-bundle-sdk 
vs aws-logs-sdk
         and update as needed in 
extensions-contrib/druid-ranger-security/pom.xml  -->
         <apache.ranger.version>2.8.0</apache.ranger.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to