This is an automated email from the ASF dual-hosted git repository.
FrankChen021 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new eb34e2a8ef1 build(deps): Kafka client 4.2.0 upgrade (#19441)
eb34e2a8ef1 is described below
commit eb34e2a8ef19c11474cf547d1712e7719bae62e2
Author: Shekhar Prasad Rajak <[email protected]>
AuthorDate: Wed May 13 07:47:14 2026 +0530
build(deps): Kafka client 4.2.0 upgrade (#19441)
* Bump Kafka client to 4.2.0 (#19322)
* Migrate Kafka Consumer.poll(long) to poll(Duration) for Kafka 4.x
* Add EmbeddedKafkaBroker test helper using Testcontainers
* Migrate kafka-indexing-service tests to EmbeddedKafkaBroker; drop
TestBroker
* Migrate TestKafkaExtractionCluster to Testcontainers; bump licenses
* Drop unused throws Exception from setupClass in Kafka tests
* Add curator-test test dep to kafka-indexing-service for embedded ZK
---
extensions-core/kafka-extraction-namespace/pom.xml | 24 ++--
.../query/lookup/KafkaLookupExtractorFactory.java | 3 +-
.../query/lookup/TestKafkaExtractionCluster.java | 60 ++------
extensions-core/kafka-indexing-service/pom.xml | 16 +--
.../kafkainput/KafkaStringHeaderFormatTest.java | 19 +--
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 21 +--
.../indexing/kafka/KafkaRecordSupplierTest.java | 24 +---
.../druid/indexing/kafka/KafkaSamplerSpecTest.java | 16 +--
.../indexing/kafka/simulate/KafkaResource.java | 2 +-
.../kafka/supervisor/KafkaSupervisorTest.java | 33 ++---
.../indexing/kafka/test/EmbeddedKafkaBroker.java | 118 ++++++++++++++++
.../druid/indexing/kafka/test/TestBroker.java | 152 ---------------------
licenses.yaml | 15 +-
pom.xml | 2 +-
14 files changed, 181 insertions(+), 324 deletions(-)
diff --git a/extensions-core/kafka-extraction-namespace/pom.xml
b/extensions-core/kafka-extraction-namespace/pom.xml
index ff45e8bdfdf..89655f33099 100644
--- a/extensions-core/kafka-extraction-namespace/pom.xml
+++ b/extensions-core/kafka-extraction-namespace/pom.xml
@@ -114,17 +114,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.13</artifactId>
- <version>${apache.kafka.version}</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>mx4j</groupId>
<artifactId>mx4j-tools</artifactId>
@@ -136,10 +125,15 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
- <dependency><!-- Required by dependency check, used by kafka_2.13 -->
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.library.version}</version>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers-kafka</artifactId>
+ <version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
index e03c96fa318..041a4cecd71 100644
---
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
+++
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
@@ -48,6 +48,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.validation.constraints.Min;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
@@ -181,7 +182,7 @@ public class KafkaLookupExtractorFactory implements
LookupExtractorFactory
if (executorService.isShutdown()) {
break;
}
- final ConsumerRecords<String, String> records =
consumer.poll(1000);
+ final ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
startingReads.countDown();
for (final ConsumerRecord<String, String> record : records) {
diff --git
a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
index 26f6ed0f2f4..417166f7b31 100644
---
a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
+++
b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
@@ -26,9 +26,6 @@ import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import org.apache.curator.test.TestingCluster;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.initialization.Initialization;
import org.apache.druid.java.util.common.ISE;
@@ -42,7 +39,6 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.Time;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
@@ -51,16 +47,14 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import scala.Some;
+import org.testcontainers.kafka.KafkaContainer;
import javax.annotation.Nonnull;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.ThreadLocalRandom;
import static org.junit.Assert.assertThrows;
@@ -78,8 +72,10 @@ public class TestKafkaExtractionCluster
private final Closer closer = Closer.create();
- private TestingCluster zkServer;
- private KafkaServer kafkaServer;
+ private static final String KAFKA_IMAGE =
+ System.getProperty("druid.testing.kafka.image", "apache/kafka:4.2.0");
+
+ private KafkaContainer kafkaServer;
private Injector injector;
private ObjectMapper mapper;
private KafkaLookupExtractorFactory factory;
@@ -95,23 +91,9 @@ public class TestKafkaExtractionCluster
@Before
public void setUp() throws Exception
{
- zkServer = new TestingCluster(1);
- zkServer.start();
- closer.register(() -> {
- zkServer.stop();
- });
-
- kafkaServer = new KafkaServer(
- getBrokerProperties(),
- Time.SYSTEM,
- Some.apply(StringUtils.format("TestingBroker[%d]-", 1)),
- false);
-
- kafkaServer.startup();
- closer.register(() -> {
- kafkaServer.shutdown();
- kafkaServer.awaitShutdown();
- });
+ kafkaServer = new KafkaContainer(KAFKA_IMAGE);
+ kafkaServer.start();
+ closer.register(() -> kafkaServer.stop());
log.info("---------------------------Started Kafka Broker
---------------------------");
log.info("---------------------------Publish Messages to
topic-----------------------");
@@ -163,8 +145,7 @@ public class TestKafkaExtractionCluster
private Map<String, String> getConsumerProperties()
{
final Map<String, String> props = new HashMap<>(KAFKA_PROPERTIES);
- int port = kafkaServer.advertisedListeners().apply(0).port();
- props.put("bootstrap.servers", StringUtils.format("127.0.0.1:%d", port));
+ props.put("bootstrap.servers", kafkaServer.getBootstrapServers());
return props;
}
@@ -177,26 +158,6 @@ public class TestKafkaExtractionCluster
}
}
- @Nonnull
- private KafkaConfig getBrokerProperties() throws IOException
- {
- final Properties serverProperties = new Properties();
- serverProperties.putAll(KAFKA_PROPERTIES);
- serverProperties.put("broker.id", "0");
- serverProperties.put("zookeeper.connect", zkServer.getConnectString());
- serverProperties.put("port",
String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000));
- serverProperties.put("auto.create.topics.enable", "true");
- serverProperties.put("log.dir",
temporaryFolder.newFolder().getAbsolutePath());
- serverProperties.put("num.partitions", "1");
- serverProperties.put("offsets.topic.replication.factor", "1");
- serverProperties.put("default.replication.factor", "1");
- serverProperties.put("log.cleaner.enable", "true");
- serverProperties.put("advertised.host.name", "localhost");
- serverProperties.put("zookeeper.session.timeout.ms", "30000");
- serverProperties.put("zookeeper.sync.time.ms", "200");
- return new KafkaConfig(serverProperties);
- }
-
@After
public void tearDown() throws Exception
{
@@ -207,8 +168,7 @@ public class TestKafkaExtractionCluster
{
final Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.putAll(KAFKA_PROPERTIES);
- int port = kafkaServer.advertisedListeners().apply(0).port();
- kafkaProducerProperties.put("bootstrap.servers",
StringUtils.format("127.0.0.1:%d", port));
+ kafkaProducerProperties.put("bootstrap.servers",
kafkaServer.getBootstrapServers());
kafkaProducerProperties.put("key.serializer",
ByteArraySerializer.class.getName());
kafkaProducerProperties.put("value.serializer",
ByteArraySerializer.class.getName());
kafkaProducerProperties.put("acks", "all");
diff --git a/extensions-core/kafka-indexing-service/pom.xml
b/extensions-core/kafka-indexing-service/pom.xml
index 2acfc2877d7..f86138c8742 100644
--- a/extensions-core/kafka-indexing-service/pom.xml
+++ b/extensions-core/kafka-indexing-service/pom.xml
@@ -151,9 +151,8 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.13</artifactId>
- <version>${apache.kafka.version}</version>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
@@ -190,22 +189,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-test</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
- <dependency><!-- Required by dependency check, used by kafka_2.13 -->
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.library.version}</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaStringHeaderFormatTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaStringHeaderFormatTest.java
index 913c1aca1ad..bea3b8de0f5 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaStringHeaderFormatTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaStringHeaderFormatTest.java
@@ -36,6 +36,7 @@ import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
public class KafkaStringHeaderFormatTest
@@ -100,10 +101,10 @@ public class KafkaStringHeaderFormatTest
{
String headerLabelPrefix = "test.kafka.header.";
Headers headers = new RecordHeaders(SAMPLE_HEADERS);
- inputEntity = new KafkaRecordEntity(new ConsumerRecord<>(
+ inputEntity = new KafkaRecordEntity(new ConsumerRecord<byte[], byte[]>(
"sample", 0, 0, timestamp,
- null, null, 0, 0,
- null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers
+ null, 0, 0,
+ null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers,
Optional.empty()
));
List<Pair<String, Object>> expectedResults = Arrays.asList(
Pair.of("test.kafka.header.encoding", "application/json"),
@@ -151,10 +152,10 @@ public class KafkaStringHeaderFormatTest
String headerLabelPrefix = "test.kafka.header.";
Headers headers = new RecordHeaders(header);
- inputEntity = new KafkaRecordEntity(new ConsumerRecord<>(
+ inputEntity = new KafkaRecordEntity(new ConsumerRecord<byte[], byte[]>(
"sample", 0, 0, timestamp,
- null, null, 0, 0,
- null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers
+ null, 0, 0,
+ null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers,
Optional.empty()
));
List<Pair<String, Object>> expectedResults = Arrays.asList(
Pair.of("test.kafka.header.encoding", "application/json"),
@@ -203,10 +204,10 @@ public class KafkaStringHeaderFormatTest
String headerLabelPrefix = "test.kafka.header.";
Headers headers = new RecordHeaders(header);
- inputEntity = new KafkaRecordEntity(new ConsumerRecord<>(
+ inputEntity = new KafkaRecordEntity(new ConsumerRecord<byte[], byte[]>(
"sample", 0, 0, timestamp,
- null, null, 0, 0,
- null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers
+ null, 0, 0,
+ null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers,
Optional.empty()
));
List<Pair<String, Object>> expectedResults = Arrays.asList(
Pair.of("test.kafka.header.encoding", "?pplic?tion/json"),
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index e3d98588d0a..bb40602be7b 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -37,7 +37,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Injector;
import org.apache.commons.io.FileUtils;
-import org.apache.curator.test.TestingCluster;
import org.apache.druid.cli.CliPeon;
import org.apache.druid.cli.CliPeonTest;
import org.apache.druid.cli.PeonLoadSpecHolder;
@@ -73,7 +72,7 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
-import org.apache.druid.indexing.kafka.test.TestBroker;
+import org.apache.druid.indexing.kafka.test.EmbeddedKafkaBroker;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
@@ -198,8 +197,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
"kafka.testheader.", "kafka.key", "kafka.timestamp", "kafka.topic",
"kafka.partition", "kafka.offset"
);
- private static TestingCluster zkServer;
- private static TestBroker kafkaServer;
+ private static EmbeddedKafkaBroker kafkaServer;
private static int topicPostfix;
static final Module TEST_MODULE = new
SimpleModule("kafkaTestModule").registerSubtypes(
new NamedType(TestKafkaInputFormat.class, "testKafkaInputFormat"),
@@ -288,17 +286,9 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
}
@BeforeClass
- public static void setupClass() throws Exception
+ public static void setupClass()
{
- zkServer = new TestingCluster(1);
- zkServer.start();
-
- kafkaServer = new TestBroker(
- zkServer.getConnectString(),
- null,
- 1,
- ImmutableMap.of("num.partitions", "2")
- );
+ kafkaServer = new
EmbeddedKafkaBroker(ImmutableMap.of("KAFKA_NUM_PARTITIONS", "2"));
kafkaServer.start();
taskExec = MoreExecutors.listeningDecorator(
@@ -345,9 +335,6 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
kafkaServer.close();
kafkaServer = null;
-
- zkServer.stop();
- zkServer = null;
}
@Test(timeout = 60_000L)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index 6b1b51474c7..a01ae639c04 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -24,11 +24,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
-import org.apache.curator.test.TestingCluster;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
-import org.apache.druid.indexing.kafka.test.TestBroker;
+import org.apache.druid.indexing.kafka.test.EmbeddedKafkaBroker;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.StringUtils;
@@ -79,8 +78,7 @@ public class KafkaRecordSupplierTest
private static String TOPIC = "topic";
private static int TOPIC_POS_FIX = 0;
- private static TestingCluster ZK_SERVER;
- private static TestBroker KAFKA_SERVER;
+ private static EmbeddedKafkaBroker KAFKA_SERVER;
private List<ProducerRecord<byte[], byte[]>> records;
@@ -199,19 +197,10 @@ public class KafkaRecordSupplierTest
}
@BeforeClass
- public static void setupClass() throws Exception
+ public static void setupClass()
{
- ZK_SERVER = new TestingCluster(1);
- ZK_SERVER.start();
-
- KAFKA_SERVER = new TestBroker(
- ZK_SERVER.getConnectString(),
- null,
- 1,
- ImmutableMap.of("num.partitions", "2")
- );
+ KAFKA_SERVER = new
EmbeddedKafkaBroker(ImmutableMap.of("KAFKA_NUM_PARTITIONS", "2"));
KAFKA_SERVER.start();
-
}
@Before
@@ -222,13 +211,10 @@ public class KafkaRecordSupplierTest
}
@AfterClass
- public static void tearDownClass() throws Exception
+ public static void tearDownClass()
{
KAFKA_SERVER.close();
KAFKA_SERVER = null;
-
- ZK_SERVER.stop();
- ZK_SERVER = null;
}
@Test
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index dc9c7186815..444f10013e1 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import org.apache.curator.test.TestingCluster;
import org.apache.druid.client.indexing.SamplerResponse;
import org.apache.druid.client.indexing.SamplerSpec;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
@@ -33,7 +32,7 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
-import org.apache.druid.indexing.kafka.test.TestBroker;
+import org.apache.druid.indexing.kafka.test.EmbeddedKafkaBroker;
import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
import org.apache.druid.indexing.overlord.sampler.SamplerException;
@@ -98,8 +97,7 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
schema.withTimestamp(new TimestampSpec("kafka.timestamp", "iso",
null));
};
- private static TestingCluster zkServer;
- private static TestBroker kafkaServer;
+ private static EmbeddedKafkaBroker kafkaServer;
private static List<ProducerRecord<byte[], byte[]>> generateRecords(String
topic)
{
@@ -114,20 +112,16 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
}
@BeforeClass
- public static void setupClass() throws Exception
+ public static void setupClass()
{
- zkServer = new TestingCluster(1);
- zkServer.start();
-
- kafkaServer = new TestBroker(zkServer.getConnectString(), null, 1,
ImmutableMap.of("num.partitions", "2"));
+ kafkaServer = new
EmbeddedKafkaBroker(ImmutableMap.of("KAFKA_NUM_PARTITIONS", "2"));
kafkaServer.start();
}
@AfterClass
- public static void tearDownClass() throws Exception
+ public static void tearDownClass()
{
kafkaServer.close();
- zkServer.stop();
}
@Test
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
index 2df7c6a7be1..27c5fcb65bd 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
@@ -54,7 +54,7 @@ public class KafkaResource extends
StreamIngestResource<KafkaContainer>
* defaults to {@code apache/kafka}. Environments that cannot run that
* image should set the system property to {@code apache/kafka-native}.
*/
- private static final String KAFKA_IMAGE =
System.getProperty("druid.testing.kafka.image", "apache/kafka:4.1.1");
+ private static final String KAFKA_IMAGE =
System.getProperty("druid.testing.kafka.image", "apache/kafka:4.2.0");
private EmbeddedDruidCluster cluster;
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index c629aa46192..993582693cb 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.curator.test.TestingCluster;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.JsonInputFormat;
@@ -50,7 +49,7 @@ import org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig;
import org.apache.druid.indexing.kafka.KafkaIndexTaskRunner;
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
-import org.apache.druid.indexing.kafka.test.TestBroker;
+import org.apache.druid.indexing.kafka.test.EmbeddedKafkaBroker;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
@@ -122,7 +121,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
@@ -160,8 +158,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S");
private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S");
- private static TestingCluster zkServer;
- private static TestBroker kafkaServer;
+ private static EmbeddedKafkaBroker kafkaServer;
private static String kafkaHost;
private static DataSchema dataSchema;
private static int topicPostfix;
@@ -207,24 +204,21 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
@BeforeClass
- public static void setupClass() throws Exception
+ public static void setupClass()
{
- zkServer = new TestingCluster(1);
- zkServer.start();
-
- kafkaServer = new TestBroker(
- zkServer.getConnectString(),
- null,
- 1,
+ kafkaServer = new EmbeddedKafkaBroker(
ImmutableMap.of(
- "num.partitions",
+ "KAFKA_NUM_PARTITIONS",
String.valueOf(NUM_PARTITIONS),
- "auto.create.topics.enable",
- String.valueOf(false)
+ "KAFKA_AUTO_CREATE_TOPICS_ENABLE",
+ String.valueOf(false),
+ // Kafka 4.x defaults log.message.timestamp.after.max.ms to 1h;
addSomeEvents emits future timestamps.
+ "KAFKA_LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS",
+ String.valueOf(Long.MAX_VALUE)
)
);
kafkaServer.start();
- kafkaHost = StringUtils.format("localhost:%d", kafkaServer.getPort());
+ kafkaHost = kafkaServer.getBootstrapServerUrl();
dataSchema = getDataSchema(DATASOURCE);
}
@@ -258,13 +252,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
@AfterClass
- public static void tearDownClass() throws IOException
+ public static void tearDownClass()
{
kafkaServer.close();
kafkaServer = null;
-
- zkServer.stop();
- zkServer = null;
}
@Test
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/EmbeddedKafkaBroker.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/EmbeddedKafkaBroker.java
new file mode 100644
index 00000000000..c223337decb
--- /dev/null
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/EmbeddedKafkaBroker.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka.test;
+
+import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.testcontainers.kafka.KafkaContainer;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Embedded Kafka broker for unit tests, backed by Testcontainers {@link
KafkaContainer}.
+ */
+public class EmbeddedKafkaBroker implements Closeable
+{
+ private static final String KAFKA_IMAGE =
+ System.getProperty("druid.testing.kafka.image", "apache/kafka:4.2.0");
+
+ private final KafkaContainer container;
+
+ public EmbeddedKafkaBroker()
+ {
+ this(Map.of());
+ }
+
+ /**
+ * @param brokerEnv extra env vars forwarded to the container; the
apache/kafka
+ * image maps {@code KAFKA_FOO_BAR} to broker config {@code
foo.bar}.
+ */
+ public EmbeddedKafkaBroker(Map<String, String> brokerEnv)
+ {
+ this.container = new KafkaContainer(KAFKA_IMAGE);
+ if (brokerEnv != null && !brokerEnv.isEmpty()) {
+ container.withEnv(brokerEnv);
+ }
+ }
+
+ public void start()
+ {
+ container.start();
+ }
+
+ public String getBootstrapServerUrl()
+ {
+ return container.getBootstrapServers();
+ }
+
+ public int getPort()
+ {
+ final String url = getBootstrapServerUrl();
+ final int colon = url.lastIndexOf(':');
+ return Integer.parseInt(url.substring(colon + 1));
+ }
+
+ public Map<String, Object> producerProperties()
+ {
+ final Map<String, Object> props = new HashMap<>();
+ commonClientProperties(props);
+ props.put("key.serializer", ByteArraySerializer.class.getName());
+ props.put("value.serializer", ByteArraySerializer.class.getName());
+ props.put("acks", "all");
+ props.put("enable.idempotence", "true");
+ props.put("transactional.id",
String.valueOf(ThreadLocalRandom.current().nextInt()));
+ return props;
+ }
+
+ public KafkaProducer<byte[], byte[]> newProducer()
+ {
+ return new KafkaProducer<>(producerProperties());
+ }
+
+ public Map<String, Object> consumerProperties()
+ {
+ final Map<String, Object> props = new
HashMap<>(KafkaConsumerConfigs.getConsumerProperties());
+ props.put("bootstrap.servers", getBootstrapServerUrl());
+ return props;
+ }
+
+ public Admin newAdminClient()
+ {
+ final Map<String, Object> props = new HashMap<>();
+ commonClientProperties(props);
+ return Admin.create(props);
+ }
+
+ private void commonClientProperties(Map<String, Object> props)
+ {
+ props.put("bootstrap.servers", getBootstrapServerUrl());
+ }
+
+ @Override
+ public void close()
+ {
+ container.stop();
+ }
+}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
deleted file mode 100644
index 387bba88d48..00000000000
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.kafka.test;
-
-import com.google.common.collect.ImmutableMap;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
-import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.Time;
-import scala.Some;
-
-import javax.annotation.Nullable;
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-public class TestBroker implements Closeable
-{
- private static final Random RANDOM = ThreadLocalRandom.current();
- private final String zookeeperConnect;
- private final File directory;
- private final boolean directoryCleanup;
- private final int id;
- private final Map<String, Object> brokerProps;
-
- private volatile KafkaServer server;
-
- public TestBroker(
- String zookeeperConnect,
- @Nullable File directory,
- int id,
- Map<String, Object> brokerProps
- )
- {
- this.zookeeperConnect = zookeeperConnect;
- this.directory = directory == null ? FileUtils.createTempDir() : directory;
- this.directoryCleanup = directory == null;
- this.id = id;
- this.brokerProps = brokerProps == null ? ImmutableMap.of() : brokerProps;
- }
-
- public void start()
- {
- final Properties props = new Properties();
- props.setProperty("zookeeper.connect", zookeeperConnect);
- props.setProperty("zookeeper.session.timeout.ms", "30000");
- props.setProperty("zookeeper.connection.timeout.ms", "30000");
- props.setProperty("log.dirs", directory.toString());
- props.setProperty("broker.id", String.valueOf(id));
- props.setProperty("port",
String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000));
- props.setProperty("advertised.host.name", "localhost");
- props.setProperty("transaction.state.log.replication.factor", "1");
- props.setProperty("offsets.topic.replication.factor", "1");
- props.setProperty("transaction.state.log.min.isr", "1");
- props.putAll(brokerProps);
-
- final KafkaConfig config = new KafkaConfig(props);
-
- server = new KafkaServer(
- config,
- Time.SYSTEM,
- Some.apply(StringUtils.format("TestingBroker[%d]-", id)),
- false
- );
- server.startup();
- }
-
- public int getPort()
- {
- return server.advertisedListeners().apply(0).port();
- }
-
- public KafkaProducer<byte[], byte[]> newProducer()
- {
- return new KafkaProducer<>(producerProperties());
- }
-
- public Admin newAdminClient()
- {
- return Admin.create(adminClientProperties());
- }
-
- Map<String, Object> adminClientProperties()
- {
- final Map<String, Object> props = new HashMap<>();
- commonClientProperties(props);
- return props;
- }
-
- public Map<String, Object> producerProperties()
- {
- final Map<String, Object> props = new HashMap<>();
- commonClientProperties(props);
- props.put("key.serializer", ByteArraySerializer.class.getName());
- props.put("value.serializer", ByteArraySerializer.class.getName());
- props.put("acks", "all");
- props.put("enable.idempotence", "true");
- props.put("transactional.id", String.valueOf(RANDOM.nextInt()));
- return props;
- }
-
- void commonClientProperties(Map<String, Object> props)
- {
- props.put("bootstrap.servers", StringUtils.format("localhost:%d",
getPort()));
- }
-
- public Map<String, Object> consumerProperties()
- {
- final Map<String, Object> props =
KafkaConsumerConfigs.getConsumerProperties();
- props.put("bootstrap.servers", StringUtils.format("localhost:%d",
getPort()));
- return props;
- }
-
- @Override
- public void close() throws IOException
- {
- if (server != null) {
- server.shutdown();
- server.awaitShutdown();
- }
- if (directoryCleanup) {
- FileUtils.deleteDirectory(directory);
- }
- }
-}
diff --git a/licenses.yaml b/licenses.yaml
index 3ec887ec5c4..3b76c7490b4 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -3414,7 +3414,7 @@ libraries:
---
name: Apache Kafka
-version: 3.9.2
+version: 4.2.0
license_category: binary
module: extensions/druid-kafka-indexing-service
license_name: Apache License version 2.0
@@ -4418,7 +4418,7 @@ name: Apache Kafka
license_category: binary
module: extensions/kafka-extraction-namespace
license_name: Apache License version 2.0
-version: 3.6.1
+version: 4.2.0
libraries:
- org.apache.kafka: kafka-clients
notices:
@@ -4449,17 +4449,6 @@ notices:
---
-name: Scala Library
-license_category: binary
-module: extensions/kafka-extraction-namespace
-license_name: Apache License version 2.0
-copyright: LAMP/EPFL and Lightbend, Inc.
-version: 2.13.16
-libraries:
- - org.scala-lang: scala-library
-
----
-
name: Microsoft Azure SDK For Key Vault Core
license_category: binary
module: extensions/druid-azure-extensions
diff --git a/pom.xml b/pom.xml
index 7c3752b3181..bf1da17afd6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,7 @@
<project.build.resourceEncoding>UTF-8</project.build.resourceEncoding>
<aether.version>0.9.0.M2</aether.version>
<apache.curator.version>5.8.0</apache.curator.version>
- <apache.kafka.version>3.9.2</apache.kafka.version>
+ <apache.kafka.version>4.2.0</apache.kafka.version>
<!-- when updating apache ranger, verify the usage of aws-bundle-sdk
vs aws-logs-sdk
and update as needed in
extensions-contrib/druid-ranger-security/pom.xml -->
<apache.ranger.version>2.8.0</apache.ranger.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]