This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a5eae57fc81 Add Kafka 4.x client support (#17633)
a5eae57fc81 is described below
commit a5eae57fc81fa7232e1199d45b9b19817b0e611c
Author: Arunkumar Saravanan <[email protected]>
AuthorDate: Fri Feb 6 12:47:42 2026 +0530
Add Kafka 4.x client support (#17633)
---
pinot-integration-tests/pom.xml | 5 +
.../LLCRealtimeKafka4ClusterIntegrationTest.java | 142 ++++++
.../pinot-stream-ingestion/pinot-kafka-4.0/pom.xml | 84 ++++
.../stream/kafka40/KafkaConsumerFactory.java | 70 +++
.../KafkaPartitionLevelConnectionHandler.java | 234 +++++++++
.../kafka40/KafkaPartitionLevelConsumer.java | 136 ++++++
.../kafka40/KafkaStreamMetadataProvider.java | 285 +++++++++++
.../SynchronizedKafkaStreamMetadataProvider.java | 44 ++
.../kafka40/KafkaAdminClientManagerTest.java | 130 +++++
.../KafkaPartitionLevelConnectionHandlerTest.java | 103 ++++
.../kafka40/KafkaPartitionLevelConsumerTest.java | 534 +++++++++++++++++++++
.../KafkaPartitionLevelStreamConfigTest.java | 203 ++++++++
.../stream/kafka40/utils/MiniKafkaCluster.java | 113 +++++
.../pinot-kafka-4.0/src/test/resources/log4j2.xml | 36 ++
pinot-plugins/pinot-stream-ingestion/pom.xml | 1 +
.../org/apache/pinot/spi/plugin/PluginManager.java | 1 +
pom.xml | 6 +
17 files changed, 2127 insertions(+)
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index c68d88c3cf6..7af0bfa0c06 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -253,6 +253,11 @@
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-kafka-4.0</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka4ClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka4ClusterIntegrationTest.java
new file mode 100644
index 00000000000..7b101d49d52
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka4ClusterIntegrationTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
+import org.apache.pinot.plugin.stream.kafka40.KafkaConsumerFactory;
+import org.apache.pinot.plugin.stream.kafka40.KafkaPartitionLevelConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Integration test for low-level Kafka 4.x consumer.
+ */
+public class LLCRealtimeKafka4ClusterIntegrationTest extends
LLCRealtimeClusterIntegrationTest {
+
+ @Override
+ protected Map<String, String> getStreamConfigMap() {
+ Map<String, String> streamConfigMap = super.getStreamConfigMap();
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(
+ streamConfigMap.get(StreamConfigProperties.STREAM_TYPE),
+ StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
ExceptingKafka4ConsumerFactory.class.getName());
+ ExceptingKafka4ConsumerFactory.init(getHelixClusterName(), _helixAdmin,
getTableName());
+ return streamConfigMap;
+ }
+
+ public static class ExceptingKafka4ConsumerFactory extends
KafkaConsumerFactory {
+
+ public static final int PARTITION_FOR_EXCEPTIONS = 1; // Setting this to
-1 disables all exceptions thrown.
+ public static final int SEQ_NUM_FOR_CREATE_EXCEPTION = 1;
+ public static final int SEQ_NUM_FOR_CONSUME_EXCEPTION = 3;
+
+ private static HelixAdmin _helixAdmin;
+ private static String _helixClusterName;
+ private static String _tableName;
+ public ExceptingKafka4ConsumerFactory() {
+ super();
+ }
+
+ public static void init(String helixClusterName, HelixAdmin helixAdmin,
String tableName) {
+ _helixAdmin = helixAdmin;
+ _helixClusterName = helixClusterName;
+ _tableName = tableName;
+ }
+
+ @Override
+ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
+ PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
+ /*
+ * The segment data manager is creating a consumer to consume rows into
a segment.
+ * Check the partition and sequence number of the segment and decide
whether it
+ * qualifies for:
+ * - Throwing exception during create OR
+ * - Throwing exception during consumption.
+ * Make sure that this still works if retries are added in
RealtimeSegmentDataManager
+ */
+ int partition = partitionGroupConsumptionStatus.getPartitionGroupId();
+ boolean exceptionDuringConsume = false;
+ int seqNum = getSegmentSeqNum(partition);
+ if (partition == PARTITION_FOR_EXCEPTIONS) {
+ if (seqNum == SEQ_NUM_FOR_CREATE_EXCEPTION) {
+ throw new RuntimeException("TestException during consumer creation");
+ } else if (seqNum == SEQ_NUM_FOR_CONSUME_EXCEPTION) {
+ exceptionDuringConsume = true;
+ }
+ }
+ return new ExceptingKafka4Consumer(clientId, _streamConfig, partition,
exceptionDuringConsume);
+ }
+
+ @Override
+ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
+ PartitionGroupConsumptionStatus partitionGroupConsumptionStatus,
RetryPolicy retryPolicy) {
+ return createPartitionGroupConsumer(clientId,
partitionGroupConsumptionStatus);
+ }
+
+ private int getSegmentSeqNum(int partition) {
+ IdealState is = _helixAdmin.getResourceIdealState(_helixClusterName,
+ TableNameBuilder.REALTIME.tableNameWithType(_tableName));
+ AtomicInteger seqNum = new AtomicInteger(-1);
+ is.getPartitionSet().forEach(segmentNameStr -> {
+ if (LLCSegmentName.isLLCSegment(segmentNameStr)) {
+ if (is.getInstanceStateMap(segmentNameStr).values().contains(
+ CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING)) {
+ LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+ if (segmentName.getPartitionGroupId() == partition) {
+ seqNum.set(segmentName.getSequenceNumber());
+ }
+ }
+ }
+ });
+ assertTrue(seqNum.get() >= 0, "No consuming segment found in partition:
" + partition);
+ return seqNum.get();
+ }
+
+ public static class ExceptingKafka4Consumer extends
KafkaPartitionLevelConsumer {
+ private final boolean _exceptionDuringConsume;
+
+ public ExceptingKafka4Consumer(String clientId, StreamConfig
streamConfig, int partition,
+ boolean exceptionDuringConsume) {
+ super(clientId, streamConfig, partition);
+ _exceptionDuringConsume = exceptionDuringConsume;
+ }
+
+ @Override
+ public KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset
startOffset, int timeoutMs) {
+ if (_exceptionDuringConsume) {
+ throw new RuntimeException("TestException during consumption");
+ }
+ return super.fetchMessages(startOffset, timeoutMs);
+ }
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/pom.xml
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/pom.xml
new file mode 100644
index 00000000000..68df0ed23ba
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>pinot-stream-ingestion</artifactId>
+ <groupId>org.apache.pinot</groupId>
+ <version>1.5.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pinot-kafka-4.0</artifactId>
+ <name>Pinot Kafka 4.x</name>
+ <url>https://pinot.apache.org/</url>
+ <properties>
+ <pinot.root>${basedir}/../../..</pinot.root>
+ <shade.phase.prop>package</shade.phase.prop>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-kafka-base</artifactId>
+ </dependency>
+
+ <!-- Kafka 4.x client (pure Java, no Scala dependency) -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka4.version}</version>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-kafka-base</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <!-- Testcontainers for embedded Kafka testing with KRaft mode -->
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>pinot-fastdev</id>
+ <properties>
+ <shade.phase.prop>none</shade.phase.prop>
+ </properties>
+ </profile>
+ </profiles>
+</project>
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaConsumerFactory.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaConsumerFactory.java
new file mode 100644
index 00000000000..c1eec69aa5e
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaConsumerFactory.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import org.apache.pinot.plugin.stream.kafka.KafkaConfigBackwardCompatibleUtils;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+
+
+public class KafkaConsumerFactory extends StreamConsumerFactory {
+
+ @Override
+ protected void init(StreamConfig streamConfig) {
+ KafkaConfigBackwardCompatibleUtils.handleStreamConfig(streamConfig);
+ super.init(streamConfig);
+ }
+
+ @Override
+ public StreamMetadataProvider createPartitionMetadataProvider(String
clientId, int partition) {
+ return new KafkaStreamMetadataProvider(clientId, _streamConfig, partition);
+ }
+
+ @Override
+ public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
+ return new KafkaStreamMetadataProvider(clientId, _streamConfig);
+ }
+
+ @Override
+ public StreamMetadataProvider createStreamMetadataProvider(String clientId,
boolean concurrentAccessExpected) {
+ if (concurrentAccessExpected) {
+ return new SynchronizedKafkaStreamMetadataProvider(clientId,
_streamConfig);
+ } else {
+ return createStreamMetadataProvider(clientId);
+ }
+ }
+
+ @Override
+ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
+ PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
+ return new KafkaPartitionLevelConsumer(clientId, _streamConfig,
+ partitionGroupConsumptionStatus.getStreamPartitionGroupId());
+ }
+
+ @Override
+ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
+ PartitionGroupConsumptionStatus partitionGroupConsumptionStatus,
RetryPolicy retryPolicy) {
+ return new KafkaPartitionLevelConsumer(clientId, _streamConfig,
+ partitionGroupConsumptionStatus.getStreamPartitionGroupId(),
retryPolicy);
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConnectionHandler.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConnectionHandler.java
new file mode 100644
index 00000000000..36c24f43771
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConnectionHandler.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.plugin.stream.kafka.KafkaAdminClientManager;
+import org.apache.pinot.plugin.stream.kafka.KafkaPartitionLevelStreamConfig;
+import org.apache.pinot.plugin.stream.kafka.KafkaSSLUtils;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * KafkaPartitionLevelConnectionHandler provides low level APIs to access
Kafka partition level information.
+ * E.g. partition counts, offsets per partition.
+ *
+ */
+public abstract class KafkaPartitionLevelConnectionHandler {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaPartitionLevelConnectionHandler.class);
+ private static final Set<String> CONSUMER_CONFIG_NAMES =
ConsumerConfig.configNames();
+ private static final Set<String> ADMIN_CLIENT_CONFIG_NAMES =
AdminClientConfig.configNames();
+ protected final KafkaPartitionLevelStreamConfig _config;
+ protected final String _clientId;
+ protected final int _partition;
+ protected final String _topic;
+ protected final Consumer<String, Bytes> _consumer;
+ protected final TopicPartition _topicPartition;
+ protected final Properties _consumerProp;
+ protected volatile KafkaAdminClientManager.AdminClientReference
_sharedAdminClientRef;
+
+ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig
streamConfig, int partition) {
+ this(clientId, streamConfig, partition, null);
+ }
+
+ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig
streamConfig, int partition,
+ @Nullable RetryPolicy retryPolicy) {
+ _config = new KafkaPartitionLevelStreamConfig(streamConfig);
+ _clientId = clientId;
+ _partition = partition;
+ _topic = _config.getKafkaTopicName();
+ _consumerProp = buildProperties(streamConfig);
+ KafkaSSLUtils.initSSL(_consumerProp);
+ if (retryPolicy == null) {
+ _consumer = createConsumer(_consumerProp);
+ } else {
+ _consumer = createConsumer(_consumerProp, retryPolicy);
+ }
+ _topicPartition = new TopicPartition(_topic, _partition);
+ _consumer.assign(Collections.singletonList(_topicPartition));
+ }
+
+ private Properties buildProperties(StreamConfig streamConfig) {
+ Properties consumerProp = new Properties();
+ consumerProp.putAll(streamConfig.getStreamConfigsMap());
+ consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
_config.getBootstrapHosts());
+ consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
BytesDeserializer.class.getName());
+ if (_config.getKafkaIsolationLevel() != null) {
+ consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
_config.getKafkaIsolationLevel());
+ }
+ consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
+ return consumerProp;
+ }
+
+ /**
+ * Filter properties to only include the specified Kafka configurations.
+ * This prevents "was supplied but isn't a known config" warnings from Kafka
clients.
+ *
+ * @param props The properties to filter
+ * @param validConfigNames The set of valid configuration names for the
target Kafka client
+ * @return A new Properties object containing only the valid configurations
+ */
+ private Properties filterKafkaProperties(Properties props, Set<String>
validConfigNames) {
+ Properties filteredProps = new Properties();
+ for (String key : props.stringPropertyNames()) {
+ if (validConfigNames.contains(key)) {
+ filteredProps.put(key, props.get(key));
+ }
+ }
+ return filteredProps;
+ }
+
+ private Consumer<String, Bytes> createConsumer(Properties consumerProp,
RetryPolicy retryPolicy) {
+ AtomicReference<Consumer<String, Bytes>> consumer = new
AtomicReference<>();
+ try {
+ retryPolicy.attempt(() -> {
+ try {
+ consumer.set(new KafkaConsumer<>(filterKafkaProperties(consumerProp,
CONSUMER_CONFIG_NAMES)));
+ return true;
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while creating Kafka consumer,
retrying.", e);
+ return false;
+ }
+ });
+ } catch (AttemptsExceededException | RetriableOperationException e) {
+ LOGGER.error("Caught exception while creating Kafka consumer, giving
up", e);
+ throw new RuntimeException(e);
+ }
+ return consumer.get();
+ }
+
+ @VisibleForTesting
+ protected Consumer<String, Bytes> createConsumer(Properties consumerProp) {
+ return retry(() -> new KafkaConsumer<>(filterKafkaProperties(consumerProp,
CONSUMER_CONFIG_NAMES)), 5);
+ }
+
+ protected AdminClient createAdminClient() {
+ return retry(() -> AdminClient.create(filterKafkaProperties(_consumerProp,
ADMIN_CLIENT_CONFIG_NAMES)), 5);
+ }
+
+ /**
+ * Gets or creates a reusable admin client instance. The admin client is
lazily initialized
+ * and reused across multiple calls to avoid the overhead of creating new
connections.
+ *
+ * @return the admin client instance
+ */
+ @Deprecated
+ protected AdminClient getOrCreateAdminClient() {
+ return createAdminClient();
+ }
+
+ /**
+ * Gets or creates a shared admin client instance that can be reused across
multiple
+ * connection handlers connecting to the same Kafka cluster. This provides
better
+ * resource efficiency when multiple consumers/producers connect to the same
bootstrap servers.
+ *
+ * @return the shared admin client instance
+ */
+ protected AdminClient getOrCreateSharedAdminClient() {
+ return getOrCreateSharedAdminClientInternal(false);
+ }
+
+ private AdminClient getOrCreateSharedAdminClientInternal(boolean isRetry) {
+ KafkaAdminClientManager.AdminClientReference ref = _sharedAdminClientRef;
+ if (ref == null) {
+ synchronized (this) {
+ ref = _sharedAdminClientRef;
+ if (ref == null) {
+ ref =
KafkaAdminClientManager.getInstance().getOrCreateAdminClient(_consumerProp);
+ _sharedAdminClientRef = ref;
+ }
+ }
+ }
+ try {
+ return ref.getAdminClient();
+ } catch (IllegalStateException e) {
+ if (isRetry) {
+ throw new RuntimeException("Failed to create admin client after
retry", e);
+ }
+ // Reference was closed, retry once
+ synchronized (this) {
+ _sharedAdminClientRef = null;
+ }
+ return getOrCreateSharedAdminClientInternal(true);
+ }
+ }
+
+ private static <T> T retry(Supplier<T> s, int nRetries) {
+ // Creation of the KafkaConsumer can fail for multiple reasons including
DNS issues.
+ // We arbitrarily chose 5 retries with 2 seconds sleep in between retries.
10 seconds total felt
+ // like a good balance of not waiting too long for a retry, but also not
retrying too many times.
+ int tries = 0;
+ while (true) {
+ try {
+ return s.get();
+ } catch (KafkaException e) {
+ tries++;
+ if (tries >= nRetries) {
+ LOGGER.error("Caught exception while creating Kafka consumer, giving
up", e);
+ throw e;
+ }
+ LOGGER.warn("Caught exception while creating Kafka consumer, retrying
{}/{}", tries, nRetries, e);
+ // We are choosing to sleepUniterruptibly here because other parts of
the Kafka consumer code do this
+ // as well. We don't want random interrupts to cause us to fail to
create the consumer and have the table
+ // stuck in ERROR state.
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ public void close()
+ throws IOException {
+ _consumer.close();
+ if (_sharedAdminClientRef != null) {
+ _sharedAdminClientRef.close();
+ _sharedAdminClientRef = null;
+ }
+ }
+
+ @VisibleForTesting
+ public KafkaPartitionLevelStreamConfig getKafkaPartitionLevelStreamConfig() {
+ return _config;
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumer.java
new file mode 100644
index 00000000000..589736ef444
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumer.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
+import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.plugin.stream.kafka.KafkaStreamMessageMetadata;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.BytesStreamMessage;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class KafkaPartitionLevelConsumer extends
KafkaPartitionLevelConnectionHandler
+ implements PartitionGroupConsumer {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);
+
+ private long _lastFetchedOffset = -1;
+
+ public KafkaPartitionLevelConsumer(String clientId, StreamConfig
streamConfig, int partition) {
+ super(clientId, streamConfig, partition);
+ }
+
+ public KafkaPartitionLevelConsumer(String clientId, StreamConfig
streamConfig, int partition,
+ RetryPolicy retryPolicy) {
+ super(clientId, streamConfig, partition, retryPolicy);
+ }
+
+ @Override
+ public synchronized KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset
startMsgOffset, int timeoutMs) {
+ long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Polling partition: {}, startOffset: {}, timeout: {}ms",
_topicPartition, startOffset, timeoutMs);
+ }
+ if (_lastFetchedOffset < 0 || _lastFetchedOffset != startOffset - 1) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Seeking to offset: {}", startOffset);
+ }
+ _consumer.seek(_topicPartition, startOffset);
+ }
+
+ ConsumerRecords<String, Bytes> consumerRecords =
_consumer.poll(Duration.ofMillis(timeoutMs));
+ List<ConsumerRecord<String, Bytes>> records =
consumerRecords.records(_topicPartition);
+ List<BytesStreamMessage> filteredRecords = new ArrayList<>(records.size());
+ long firstOffset = -1;
+ long offsetOfNextBatch = startOffset;
+ StreamMessageMetadata lastMessageMetadata = null;
+ long batchSizeInBytes = 0;
+ if (!records.isEmpty()) {
+ firstOffset = records.get(0).offset();
+ _lastFetchedOffset = records.get(records.size() - 1).offset();
+ offsetOfNextBatch = _lastFetchedOffset + 1;
+ for (ConsumerRecord<String, Bytes> record : records) {
+ StreamMessageMetadata messageMetadata = extractMessageMetadata(record);
+ Bytes message = record.value();
+ if (message != null) {
+ String key = record.key();
+ byte[] keyBytes = key != null ? key.getBytes(StandardCharsets.UTF_8)
: null;
+ filteredRecords.add(new BytesStreamMessage(keyBytes, message.get(),
messageMetadata));
+ if (messageMetadata.getRecordSerializedSize() > 0) {
+ batchSizeInBytes += messageMetadata.getRecordSerializedSize();
+ }
+ } else if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Tombstone message at offset: {}", record.offset());
+ }
+ lastMessageMetadata = messageMetadata;
+ }
+ }
+
+ // In case read_committed is enabled, the messages consumed are not
guaranteed to have consecutive offsets.
+ // TODO: A better solution would be to fetch earliest offset from topic
and see if it is greater than startOffset.
+ // However, this would require and additional call to Kafka which we want
to avoid.
+ boolean hasDataLoss = false;
+ if (_config.getKafkaIsolationLevel() == null ||
_config.getKafkaIsolationLevel()
+
.equals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED))
{
+ hasDataLoss = firstOffset > startOffset;
+ }
+ return new KafkaMessageBatch(filteredRecords, records.size(),
offsetOfNextBatch, firstOffset, lastMessageMetadata,
+ hasDataLoss, batchSizeInBytes);
+ }
+
+ private StreamMessageMetadata extractMessageMetadata(ConsumerRecord<String,
Bytes> record) {
+ long timestamp = record.timestamp();
+ long offset = record.offset();
+
+ StreamMessageMetadata.Builder builder = new
StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp)
+ .setOffset(new LongMsgOffset(offset), new LongMsgOffset(offset + 1))
+ .setSerializedValueSize(record.serializedValueSize());
+ if (_config.isPopulateMetadata()) {
+ Headers headers = record.headers();
+ if (headers != null) {
+ GenericRow headerGenericRow = new GenericRow();
+ for (Header header : headers.toArray()) {
+ headerGenericRow.putValue(header.key(), header.value());
+ }
+ builder.setHeaders(headerGenericRow);
+ }
+
builder.setMetadata(Map.of(KafkaStreamMessageMetadata.RECORD_TIMESTAMP_KEY,
String.valueOf(timestamp),
+ KafkaStreamMessageMetadata.METADATA_OFFSET_KEY,
String.valueOf(offset),
+ KafkaStreamMessageMetadata.METADATA_PARTITION_KEY,
String.valueOf(record.partition())));
+ }
+ return builder.build();
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
new file mode 100644
index 00000000000..032fbf66fe2
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.pinot.plugin.stream.kafka.KafkaConsumerPartitionLag;
+import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.TransientConsumerException;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHandler
+ implements StreamMetadataProvider {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaStreamMetadataProvider.class);
+
+ public KafkaStreamMetadataProvider(String clientId, StreamConfig
streamConfig) {
+ this(clientId, streamConfig, Integer.MIN_VALUE);
+ }
+
+ public KafkaStreamMetadataProvider(String clientId, StreamConfig
streamConfig, int partition) {
+ super(clientId, streamConfig, partition);
+ }
+
+ @Override
+ public int fetchPartitionCount(long timeoutMillis) {
+ try {
+ List<PartitionInfo> partitionInfos = _consumer.partitionsFor(_topic,
Duration.ofMillis(timeoutMillis));
+ if (CollectionUtils.isNotEmpty(partitionInfos)) {
+ return partitionInfos.size();
+ }
+ throw new RuntimeException(String.format("Failed to fetch partition
information for topic: %s", _topic));
+ } catch (TimeoutException e) {
+ throw new TransientConsumerException(e);
+ }
+ }
+
+ @Override
+ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
+ try {
+ List<PartitionInfo> partitionInfos = _consumer.partitionsFor(_topic,
Duration.ofMillis(timeoutMillis));
+ if (CollectionUtils.isEmpty(partitionInfos)) {
+ throw new RuntimeException(String.format("Failed to fetch partition
information for topic: %s", _topic));
+ }
+ Set<Integer> partitionIds =
Sets.newHashSetWithExpectedSize(partitionInfos.size());
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ partitionIds.add(partitionInfo.partition());
+ }
+ return partitionIds;
+ } catch (TimeoutException e) {
+ throw new TransientConsumerException(e);
+ }
+ }
+
+ @Override
+ public Map<Integer, StreamPartitionMsgOffset>
fetchLatestStreamOffset(Set<Integer> partitionIds, long timeoutMillis) {
+ List<TopicPartition> topicPartitions = new
ArrayList<>(partitionIds.size());
+ for (Integer streamPartition: partitionIds) {
+ topicPartitions.add(new TopicPartition(_topic, streamPartition));
+ }
+ try {
+ Map<TopicPartition, Long> topicPartitionToLatestOffsetMap =
+ _consumer.endOffsets(topicPartitions,
Duration.ofMillis(timeoutMillis));
+
+ Map<Integer, StreamPartitionMsgOffset> partitionIdToLatestOffset =
+ new HashMap<>(topicPartitionToLatestOffsetMap.size());
+ for (Map.Entry<TopicPartition, Long> entry :
topicPartitionToLatestOffsetMap.entrySet()) {
+ partitionIdToLatestOffset.put(entry.getKey().partition(), new
LongMsgOffset(entry.getValue()));
+ }
+
+ return partitionIdToLatestOffset;
+ } catch (TimeoutException e) {
+ throw new TransientConsumerException(e);
+ }
+ }
+
+ @Override
+ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
+ Preconditions.checkNotNull(offsetCriteria);
+ long offset;
+ try {
+ if (offsetCriteria.isLargest()) {
+ offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
+ .get(_topicPartition);
+ } else if (offsetCriteria.isSmallest()) {
+ offset =
+
_consumer.beginningOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
+ .get(_topicPartition);
+ } else if (offsetCriteria.isPeriod()) {
+ OffsetAndTimestamp offsetAndTimestamp =
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
+ Clock.systemUTC().millis() -
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString())))
+ .get(_topicPartition);
+ if (offsetAndTimestamp == null) {
+ offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
+ .get(_topicPartition);
+ LOGGER.warn(
+ "initial offset type is period and its value evaluates to null
hence proceeding with offset {} for "
+ + "topic {} partition {}", offset, _topicPartition.topic(),
_topicPartition.partition());
+ } else {
+ offset = offsetAndTimestamp.offset();
+ }
+ } else if (offsetCriteria.isTimestamp()) {
+ OffsetAndTimestamp offsetAndTimestamp =
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
+
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition);
+ if (offsetAndTimestamp == null) {
+ offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
+ .get(_topicPartition);
+ LOGGER.warn(
+ "initial offset type is timestamp and its value evaluates to
null hence proceeding with offset {} for "
+ + "topic {} partition {}", offset, _topicPartition.topic(),
_topicPartition.partition());
+ } else {
+ offset = offsetAndTimestamp.offset();
+ }
+ } else {
+ throw new IllegalArgumentException("Unknown initial offset value " +
offsetCriteria);
+ }
+ return new LongMsgOffset(offset);
+ } catch (TimeoutException e) {
+ throw new TransientConsumerException(e);
+ }
+ }
+
+ @Override
+ public Map<String, PartitionLagState> getCurrentPartitionLagState(
+ Map<String, ConsumerPartitionState> currentPartitionStateMap) {
+ Map<String, PartitionLagState> perPartitionLag = new HashMap<>();
+ for (Map.Entry<String, ConsumerPartitionState> entry :
currentPartitionStateMap.entrySet()) {
+ ConsumerPartitionState partitionState = entry.getValue();
+ // Compute records-lag
+ StreamPartitionMsgOffset currentOffset =
partitionState.getCurrentOffset();
+ StreamPartitionMsgOffset upstreamLatest =
partitionState.getUpstreamLatestOffset();
+ String offsetLagString = "UNKNOWN";
+
+ if (currentOffset instanceof LongMsgOffset && upstreamLatest instanceof
LongMsgOffset) {
+ long offsetLag = ((LongMsgOffset) upstreamLatest).getOffset() -
((LongMsgOffset) currentOffset).getOffset();
+ offsetLagString = String.valueOf(offsetLag);
+ }
+
+ // Compute record-availability
+ String availabilityLagMs = "UNKNOWN";
+ StreamMessageMetadata lastProcessedMessageMetadata =
partitionState.getLastProcessedRowMetadata();
+ if (lastProcessedMessageMetadata != null &&
partitionState.getLastProcessedTimeMs() > 0) {
+ long availabilityLag =
+ partitionState.getLastProcessedTimeMs() -
lastProcessedMessageMetadata.getRecordIngestionTimeMs();
+ availabilityLagMs = String.valueOf(availabilityLag);
+ }
+
+ perPartitionLag.put(entry.getKey(), new
KafkaConsumerPartitionLag(offsetLagString, availabilityLagMs));
+ }
+ return perPartitionLag;
+ }
+
+ @Override
+ public List<TopicMetadata> getTopics() {
+ try {
+ AdminClient adminClient = getOrCreateSharedAdminClient();
+ ListTopicsResult result = adminClient.listTopics();
+ if (result == null) {
+ return Collections.emptyList();
+ }
+ return result.names()
+ .get()
+ .stream()
+ .map(topic -> new KafkaTopicMetadata().setName(topic))
+ .collect(Collectors.toList());
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean supportsOffsetLag() {
+ return true;
+ }
+
+ public static class KafkaTopicMetadata implements TopicMetadata {
+ private String _name;
+
+ public String getName() {
+ return _name;
+ }
+
+ public KafkaTopicMetadata setName(String name) {
+ _name = name;
+ return this;
+ }
+ }
+
+
+
+ @Override
+ public StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long
timestampMillis, long timeoutMillis) {
+ try {
+ OffsetAndTimestamp offsetAndTimestamp =
_consumer.offsetsForTimes(Map.of(_topicPartition, timestampMillis),
+ Duration.ofMillis(timeoutMillis)).get(_topicPartition);
+ if (offsetAndTimestamp == null) {
+ return null;
+ }
+ return new LongMsgOffset(offsetAndTimestamp.offset());
+ } catch (Exception e) {
+ LOGGER.warn("Failed to get offset at timestamp {} for partition {}",
timestampMillis, partitionId, e);
+ return null;
+ }
+ }
+
+ @Override
+ public Map<String, StreamPartitionMsgOffset> getStreamStartOffsets() {
+ List<PartitionInfo> partitionInfos = _consumer.partitionsFor(_topic);
+ Map<TopicPartition, Long> startOffsets = _consumer.beginningOffsets(
+ partitionInfos.stream()
+ .map(info -> new TopicPartition(_topic, info.partition()))
+ .collect(Collectors.toList()));
+ return startOffsets.entrySet().stream().collect(
+ Collectors.toMap(
+ entry -> String.valueOf(entry.getKey().partition()),
+ entry -> new LongMsgOffset(entry.getValue()),
+ (existingValue, newValue) -> newValue
+ ));
+ }
+
+ @Override
+ public Map<String, StreamPartitionMsgOffset> getStreamEndOffsets() {
+ List<PartitionInfo> partitionInfos = _consumer.partitionsFor(_topic);
+ Map<TopicPartition, Long> endOffsets = _consumer.endOffsets(
+ partitionInfos.stream()
+ .map(info -> new TopicPartition(_topic, info.partition()))
+ .collect(Collectors.toList()));
+ return endOffsets.entrySet().stream().collect(
+ Collectors.toMap(
+ entry -> String.valueOf(entry.getKey().partition()),
+ entry -> new LongMsgOffset(entry.getValue()),
+ (existingValue, newValue) -> newValue
+ ));
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ super.close();
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/SynchronizedKafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/SynchronizedKafkaStreamMetadataProvider.java
new file mode 100644
index 00000000000..9733b0e4add
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/SynchronizedKafkaStreamMetadataProvider.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+/**
+ * A thread-safe variant of {@link KafkaStreamMetadataProvider} that
synchronizes
+ * access to metadata fetch operations.
+ * This is particularly useful when a shared instance of {@link
KafkaStreamMetadataProvider}
+ * is accessed concurrently by multiple threads.
+ */
+public class SynchronizedKafkaStreamMetadataProvider extends
KafkaStreamMetadataProvider {
+
+ public SynchronizedKafkaStreamMetadataProvider(String clientId, StreamConfig
streamConfig) {
+ super(clientId, streamConfig);
+ }
+
+ @Override
+ public Map<Integer, StreamPartitionMsgOffset>
fetchLatestStreamOffset(Set<Integer> partitionIds, long timeoutMillis) {
+ synchronized (this) {
+ return super.fetchLatestStreamOffset(partitionIds, timeoutMillis);
+ }
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaAdminClientManagerTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaAdminClientManagerTest.java
new file mode 100644
index 00000000000..89ee1b8857a
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaAdminClientManagerTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import java.util.Properties;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.pinot.plugin.stream.kafka.KafkaAdminClientManager;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class KafkaAdminClientManagerTest {
+
+ @Test
+ public void testGetInstance() {
+ KafkaAdminClientManager instance1 = KafkaAdminClientManager.getInstance();
+ KafkaAdminClientManager instance2 = KafkaAdminClientManager.getInstance();
+ assertSame(instance1, instance2, "Should return the same singleton
instance");
+ }
+
+ @Test
+ public void testCreateCacheKey() {
+ Properties props1 = new Properties();
+ props1.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
+ props1.setProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
"PLAINTEXT");
+
+ Properties props2 = new Properties();
+ props2.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
+ props2.setProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
"PLAINTEXT");
+
+ Properties props3 = new Properties();
+ props3.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9093");
+ props3.setProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
"PLAINTEXT");
+
+ KafkaAdminClientManager manager = KafkaAdminClientManager.getInstance();
+
+ // Use reflection to access private method for testing
+ try {
+ java.lang.reflect.Method createCacheKeyMethod =
+ KafkaAdminClientManager.class.getDeclaredMethod("createCacheKey",
Properties.class);
+ createCacheKeyMethod.setAccessible(true);
+
+ String key1 = (String) createCacheKeyMethod.invoke(manager, props1);
+ String key2 = (String) createCacheKeyMethod.invoke(manager, props2);
+ String key3 = (String) createCacheKeyMethod.invoke(manager, props3);
+
+ assertEquals(key1, key2, "Same properties should produce same cache
key");
+ assertNotEquals(key1, key3, "Different bootstrap servers should produce
different cache keys");
+ } catch (Exception e) {
+ fail("Failed to test createCacheKey method", e);
+ }
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testGetOrCreateAdminClientWithoutBootstrapServers() {
+ Properties props = new Properties();
+ // No bootstrap servers set
+ KafkaAdminClientManager.getInstance().getOrCreateAdminClient(props);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testGetOrCreateAdminClientWithEmptyBootstrapServers() {
+ Properties props = new Properties();
+ props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+ KafkaAdminClientManager.getInstance().getOrCreateAdminClient(props);
+ }
+
+ @Test
+ public void testAdminClientReferenceClosing() {
+ Properties props = new Properties();
+ props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
+
+ KafkaAdminClientManager manager = KafkaAdminClientManager.getInstance();
+
+ // This test focuses on the reference mechanism without actually
connecting to Kafka
+ // We test that multiple calls with same properties don't throw exceptions
+ try {
+ KafkaAdminClientManager.AdminClientReference ref1 =
manager.getOrCreateAdminClient(props);
+ assertNotNull(ref1, "Reference should not be null");
+
+ // Closing a reference should not throw an exception
+ ref1.close();
+
+ // Closing again should not throw an exception
+ ref1.close();
+ } catch (Exception e) {
+ // This is expected since we're not connecting to a real Kafka cluster
+ // The test is mainly to verify the reference counting mechanism doesn't
crash
+ assertTrue(e.getMessage().contains("Connection") ||
e.getMessage().contains("Kafka")
+ || e.getMessage().contains("timeout") ||
e.getMessage().contains("refused"));
+ }
+ }
+
+ @Test
+ public void testAdminClientReferenceStateAfterClose() {
+ Properties props = new Properties();
+ props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
+
+ KafkaAdminClientManager manager = KafkaAdminClientManager.getInstance();
+
+ try {
+ KafkaAdminClientManager.AdminClientReference ref =
manager.getOrCreateAdminClient(props);
+ ref.close();
+
+ // Should throw IllegalStateException when trying to use closed reference
+ assertThrows(IllegalStateException.class, () -> ref.getAdminClient());
+ } catch (Exception e) {
+ // Expected when no real Kafka cluster is available
+ assertTrue(e.getMessage().contains("Connection") ||
e.getMessage().contains("Kafka")
+ || e.getMessage().contains("timeout") ||
e.getMessage().contains("refused"));
+ }
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConnectionHandlerTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConnectionHandlerTest.java
new file mode 100644
index 00000000000..a7a2ac9ee7a
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConnectionHandlerTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+
+public class KafkaPartitionLevelConnectionHandlerTest {
+
+ private static class TestableKafkaPartitionLevelConnectionHandler extends
KafkaPartitionLevelConnectionHandler {
+ public TestableKafkaPartitionLevelConnectionHandler(String clientId,
StreamConfig streamConfig, int partition) {
+ super(clientId, streamConfig, partition);
+ }
+ }
+
+ private StreamConfig createTestStreamConfig() {
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", "kafka");
+ streamConfigMap.put("stream.kafka.topic.name", "testTopic");
+ streamConfigMap.put("stream.kafka.broker.list", "localhost:9092");
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
KafkaConsumerFactory.class.getName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ return new StreamConfig("testTable_REALTIME", streamConfigMap);
+ }
+
+ @Test
+ public void testSharedAdminClientReference() {
+ StreamConfig streamConfig = createTestStreamConfig();
+
+ try {
+ TestableKafkaPartitionLevelConnectionHandler handler =
+ new TestableKafkaPartitionLevelConnectionHandler("testClient",
streamConfig, 0);
+
+ // Test that we can call getOrCreateSharedAdminClient multiple times
+ // without throwing exceptions (even though it may fail to connect)
+ try {
+ handler.getOrCreateSharedAdminClient();
+ handler.getOrCreateSharedAdminClient(); // Should reuse the same
reference
+ } catch (Exception e) {
+ // Expected when no real Kafka cluster is available
+ assertTrue(e.getMessage().contains("Connection") ||
e.getMessage().contains("Kafka")
+ || e.getMessage().contains("timeout") ||
e.getMessage().contains("refused")
+ || e.getCause() != null);
+ }
+
+ // Test that close doesn't throw exceptions
+ handler.close();
+ } catch (Exception e) {
+ // Expected when initializing without a real Kafka cluster
+ assertTrue(e.getMessage().contains("Connection") ||
e.getMessage().contains("Kafka")
+ || e.getMessage().contains("timeout") ||
e.getMessage().contains("refused")
+ || e.getCause() != null);
+ }
+ }
+
+ @Test
+ public void testGetOrCreateAdminClientBackwardCompatibility() {
+ StreamConfig streamConfig = createTestStreamConfig();
+
+ try {
+ TestableKafkaPartitionLevelConnectionHandler handler =
+ new TestableKafkaPartitionLevelConnectionHandler("testClient",
streamConfig, 0);
+
+ // Test that the backward compatibility method still works
+ try {
+ handler.getOrCreateAdminClient();
+ } catch (Exception e) {
+ // Expected when no real Kafka cluster is available
+ assertTrue(e.getMessage().contains("Connection") ||
e.getMessage().contains("Kafka")
+ || e.getMessage().contains("timeout") ||
e.getMessage().contains("refused")
+ || e.getCause() != null);
+ }
+
+ handler.close();
+ } catch (Exception e) {
+ // Expected when initializing without a real Kafka cluster
+ assertTrue(e.getMessage().contains("Connection") ||
e.getMessage().contains("Kafka")
+ || e.getMessage().contains("timeout") ||
e.getMessage().contains("refused")
+ || e.getCause() != null);
+ }
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumerTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumerTest.java
new file mode 100644
index 00000000000..f602d083963
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumerTest.java
@@ -0,0 +1,534 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
+import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.plugin.stream.kafka40.utils.MiniKafkaCluster;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.utils.retry.ExponentialBackoffRetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.DockerClientFactory;
+import org.testng.Assert;
+import org.testng.SkipException;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Tests for the KafkaPartitionLevelConsumer.
+ * Note: These tests require Docker to be running as they use Testcontainers.
+ */
+public class KafkaPartitionLevelConsumerTest {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaPartitionLevelConsumerTest.class);
+ private static final long STABILIZE_SLEEP_DELAYS = 3000;
+ private static final String TEST_TOPIC_1 = "foo";
+ private static final String TEST_TOPIC_2 = "bar";
+ private static final String TEST_TOPIC_3 = "expired";
+ private static final int NUM_MSG_PRODUCED_PER_PARTITION = 1000;
+ private static final long TIMESTAMP = Instant.now().toEpochMilli();
+ private static final Random RANDOM = new Random();
+
+ private MiniKafkaCluster _kafkaCluster;
+ private String _kafkaBrokerAddress;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ // Check if Docker is available, skip tests if not
+ if (!isDockerAvailable()) {
+ throw new SkipException("Docker is not available. Skipping Kafka 4.0
consumer tests. "
+ + "These tests require Docker for Testcontainers.");
+ }
+ _kafkaCluster = new MiniKafkaCluster("0");
+ _kafkaCluster.start();
+ _kafkaBrokerAddress = _kafkaCluster.getKafkaServerAddress();
+ _kafkaCluster.createTopic(TEST_TOPIC_1, 1, 1);
+ _kafkaCluster.createTopic(TEST_TOPIC_2, 2, 1);
+ _kafkaCluster.createTopic(TEST_TOPIC_3, 1, 1);
+ Thread.sleep(STABILIZE_SLEEP_DELAYS);
+ produceMsgToKafka();
+ Thread.sleep(STABILIZE_SLEEP_DELAYS);
+ _kafkaCluster.deleteRecordsBeforeOffset(TEST_TOPIC_3, 0, 200);
+ }
+
+ /**
+ * Checks if Docker is available for running Testcontainers.
+ * @return true if Docker is available, false otherwise
+ */
+ private static boolean isDockerAvailable() {
+ try {
+ DockerClientFactory.instance().client();
+ return true;
+ } catch (Throwable ex) {
+ LOGGER.warn("Docker is not available: {}", ex.getMessage());
+ return false;
+ }
+ }
+
+ private void produceMsgToKafka() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, _kafkaBrokerAddress);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "clientId");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
+ for (int i = 0; i < NUM_MSG_PRODUCED_PER_PARTITION; i++) {
+ producer.send(new ProducerRecord<>(TEST_TOPIC_1, 0, TIMESTAMP + i,
null, "sample_msg_" + i));
+ // TEST_TOPIC_2 has 2 partitions
+ producer.send(new ProducerRecord<>(TEST_TOPIC_2, 0, TIMESTAMP + i,
null, "sample_msg_" + i));
+ producer.send(new ProducerRecord<>(TEST_TOPIC_2, 1, TIMESTAMP + i,
null, "sample_msg_" + i));
+ producer.send(new ProducerRecord<>(TEST_TOPIC_3, "sample_msg_" + i));
+ }
+ producer.flush();
+ }
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ try {
+ _kafkaCluster.deleteTopic(TEST_TOPIC_1);
+ _kafkaCluster.deleteTopic(TEST_TOPIC_2);
+ _kafkaCluster.deleteTopic(TEST_TOPIC_3);
+ } finally {
+ _kafkaCluster.close();
+ }
+ }
+
+ @Test
+ public void testBuildConsumer() {
+ String streamType = "kafka";
+ String streamKafkaTopicName = "theTopic";
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
+ String clientId = "clientId";
+ String tableNameWithType = "tableName_REALTIME";
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ streamConfigMap.put("stream.kafka.fetcher.size", "10000");
+ streamConfigMap.put("stream.kafka.fetcher.minBytes", "20000");
+ StreamConfig streamConfig = new StreamConfig(tableNameWithType,
streamConfigMap);
+
+ // test default value
+ KafkaPartitionLevelConsumer kafkaSimpleStreamConsumer =
createConsumer(clientId, streamConfig, 0);
+ kafkaSimpleStreamConsumer.fetchMessages(new LongMsgOffset(12345L), 10000);
+
+
assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+
kafkaSimpleStreamConsumer.getKafkaPartitionLevelStreamConfig().getKafkaBufferSize());
+
assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+
kafkaSimpleStreamConsumer.getKafkaPartitionLevelStreamConfig().getKafkaSocketTimeout());
+
+ // test parsing values
+ assertEquals(10000,
kafkaSimpleStreamConsumer.getKafkaPartitionLevelStreamConfig().getKafkaFetcherSizeBytes());
+ assertEquals(20000,
kafkaSimpleStreamConsumer.getKafkaPartitionLevelStreamConfig().getKafkaFetcherMinBytes());
+
+ // test user defined values
+ streamConfigMap.put("stream.kafka.buffer.size", "100");
+ streamConfigMap.put("stream.kafka.socket.timeout", "1000");
+ streamConfig = new StreamConfig(tableNameWithType, streamConfigMap);
+ kafkaSimpleStreamConsumer = createConsumer(clientId, streamConfig, 0);
+ kafkaSimpleStreamConsumer.fetchMessages(new LongMsgOffset(12345L), 10000);
+ assertEquals(100,
kafkaSimpleStreamConsumer.getKafkaPartitionLevelStreamConfig().getKafkaBufferSize());
+ assertEquals(1000,
kafkaSimpleStreamConsumer.getKafkaPartitionLevelStreamConfig().getKafkaSocketTimeout());
+ }
+
+ @Test
+ public void testGetPartitionCount() {
+ String streamType = "kafka";
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
+ String clientId = "clientId";
+ String tableNameWithType = "tableName_REALTIME";
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_1);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ StreamConfig streamConfig = new StreamConfig(tableNameWithType,
streamConfigMap);
+
+ KafkaStreamMetadataProvider streamMetadataProvider = new
KafkaStreamMetadataProvider(clientId, streamConfig);
+ assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 1);
+
+ streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_2);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ streamConfig = new StreamConfig(tableNameWithType, streamConfigMap);
+
+ streamMetadataProvider = new KafkaStreamMetadataProvider(clientId,
streamConfig);
+ assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2);
+ }
+
+ @Test
+ public void testFetchMessages() {
+ String streamType = "kafka";
+ String streamKafkaTopicName = "theTopic";
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
+ String clientId = "clientId";
+ String tableNameWithType = "tableName_REALTIME";
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ StreamConfig streamConfig = new StreamConfig(tableNameWithType,
streamConfigMap);
+
+ int partition = 0;
+ KafkaPartitionLevelConsumer kafkaSimpleStreamConsumer =
createConsumer(clientId, streamConfig, partition);
+ kafkaSimpleStreamConsumer.fetchMessages(new LongMsgOffset(12345L), 10000);
+ }
+
+ @Test
+ public void testFetchOffsets() {
+ testFetchOffsets(TEST_TOPIC_1);
+ testFetchOffsets(TEST_TOPIC_2);
+ }
+
+ private void testFetchOffsets(String topic) {
+ String streamType = "kafka";
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
+ String clientId = "clientId";
+ String tableNameWithType = "tableName_REALTIME";
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", topic);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ StreamConfig streamConfig = new StreamConfig(tableNameWithType,
streamConfigMap);
+
+ int numPartitions = new KafkaStreamMetadataProvider(clientId,
streamConfig).fetchPartitionCount(10000);
+ for (int partition = 0; partition < numPartitions; partition++) {
+ KafkaStreamMetadataProvider kafkaStreamMetadataProvider =
+ new KafkaStreamMetadataProvider(clientId, streamConfig, partition);
+ assertEquals(new
LongMsgOffset(0).compareTo(kafkaStreamMetadataProvider.fetchStreamPartitionOffset(
+ new OffsetCriteria.OffsetCriteriaBuilder().withOffsetSmallest(),
10000)), 0);
+ assertEquals(new
LongMsgOffset(0).compareTo(kafkaStreamMetadataProvider.fetchStreamPartitionOffset(
+ new OffsetCriteria.OffsetCriteriaBuilder().withOffsetAsPeriod("2d"),
10000)), 0);
+ assertEquals(new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION).compareTo(
+ kafkaStreamMetadataProvider.fetchStreamPartitionOffset(
+ new
OffsetCriteria.OffsetCriteriaBuilder().withOffsetAsTimestamp(Instant.now().toString()),
10000)), 0);
+ assertEquals(new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION).compareTo(
+ kafkaStreamMetadataProvider.fetchStreamPartitionOffset(
+ new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest(),
10000)), 0);
+ }
+ }
+
+ @Test
+ public void testConsumer()
+ throws Exception {
+ testConsumer(TEST_TOPIC_1);
+ testConsumer(TEST_TOPIC_2);
+ }
+
+ private void testConsumer(String topic)
+ throws TimeoutException {
+ String streamType = "kafka";
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
+ String clientId = "clientId";
+ String tableNameWithType = "tableName_REALTIME";
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", topic);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ StreamConfig streamConfig = new StreamConfig(tableNameWithType,
streamConfigMap);
+
+ StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ int numPartitions = new KafkaStreamMetadataProvider(clientId,
streamConfig).fetchPartitionCount(10000);
+ for (int partition = 0; partition < numPartitions; partition++) {
+ PartitionGroupConsumer consumer =
streamConsumerFactory.createPartitionGroupConsumer(clientId,
+ new PartitionGroupConsumptionStatus(partition, 0, new
LongMsgOffset(0),
+ new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), "CONSUMING"));
+
+ // Test consume a large batch, only 500 records will be returned.
+ MessageBatch messageBatch = consumer.fetchMessages(new LongMsgOffset(0),
10000);
+ assertEquals(messageBatch.getMessageCount(), 500);
+ assertEquals(messageBatch.getUnfilteredMessageCount(), 500);
+ for (int i = 0; i < 500; i++) {
+ StreamMessage streamMessage = messageBatch.getStreamMessage(i);
+ assertEquals(new String((byte[]) streamMessage.getValue()),
"sample_msg_" + i);
+ StreamMessageMetadata metadata = streamMessage.getMetadata();
+ assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + i);
+ StreamPartitionMsgOffset offset = metadata.getOffset();
+ assertTrue(offset instanceof LongMsgOffset);
+ assertEquals(((LongMsgOffset) offset).getOffset(), i);
+ StreamPartitionMsgOffset nextOffset = metadata.getNextOffset();
+ assertTrue(nextOffset instanceof LongMsgOffset);
+ assertEquals(((LongMsgOffset) nextOffset).getOffset(), i + 1);
+ }
+ assertEquals(messageBatch.getOffsetOfNextBatch().toString(), "500");
+ assertEquals(messageBatch.getFirstMessageOffset().toString(), "0");
+
assertEquals(messageBatch.getLastMessageMetadata().getOffset().toString(),
"499");
+
assertEquals(messageBatch.getLastMessageMetadata().getNextOffset().toString(),
"500");
+
+ // Test second half batch
+ messageBatch = consumer.fetchMessages(new LongMsgOffset(500), 10000);
+ assertEquals(messageBatch.getMessageCount(), 500);
+ assertEquals(messageBatch.getUnfilteredMessageCount(), 500);
+ for (int i = 0; i < 500; i++) {
+ StreamMessage streamMessage = messageBatch.getStreamMessage(i);
+ assertEquals(new String((byte[]) streamMessage.getValue()),
"sample_msg_" + (500 + i));
+ StreamMessageMetadata metadata = streamMessage.getMetadata();
+ assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + 500 + i);
+ StreamPartitionMsgOffset offset = metadata.getOffset();
+ assertTrue(offset instanceof LongMsgOffset);
+ assertEquals(((LongMsgOffset) offset).getOffset(), 500 + i);
+ StreamPartitionMsgOffset nextOffset = metadata.getNextOffset();
+ assertTrue(nextOffset instanceof LongMsgOffset);
+ assertEquals(((LongMsgOffset) nextOffset).getOffset(), 501 + i);
+ }
+ assertEquals(messageBatch.getOffsetOfNextBatch().toString(), "1000");
+ assertEquals(messageBatch.getFirstMessageOffset().toString(), "500");
+
assertEquals(messageBatch.getLastMessageMetadata().getOffset().toString(),
"999");
+
assertEquals(messageBatch.getLastMessageMetadata().getNextOffset().toString(),
"1000");
+
+ // Some random range
+ messageBatch = consumer.fetchMessages(new LongMsgOffset(10), 10000);
+ assertEquals(messageBatch.getMessageCount(), 500);
+ assertEquals(messageBatch.getUnfilteredMessageCount(), 500);
+ for (int i = 0; i < 500; i++) {
+ StreamMessage streamMessage = messageBatch.getStreamMessage(i);
+ assertEquals(new String((byte[]) streamMessage.getValue()),
"sample_msg_" + (10 + i));
+ StreamMessageMetadata metadata = streamMessage.getMetadata();
+ assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + 10 + i);
+ StreamPartitionMsgOffset offset = metadata.getOffset();
+ assertTrue(offset instanceof LongMsgOffset);
+ assertEquals(((LongMsgOffset) offset).getOffset(), 10 + i);
+ StreamPartitionMsgOffset nextOffset = metadata.getNextOffset();
+ assertTrue(nextOffset instanceof LongMsgOffset);
+ assertEquals(((LongMsgOffset) nextOffset).getOffset(), 11 + i);
+ }
+ assertEquals(messageBatch.getOffsetOfNextBatch().toString(), "510");
+ assertEquals(messageBatch.getFirstMessageOffset().toString(), "10");
+
assertEquals(messageBatch.getLastMessageMetadata().getOffset().toString(),
"509");
+
assertEquals(messageBatch.getLastMessageMetadata().getNextOffset().toString(),
"510");
+
+ // Some random range
+ messageBatch = consumer.fetchMessages(new LongMsgOffset(610), 10000);
+ assertEquals(messageBatch.getMessageCount(), 390);
+ assertEquals(messageBatch.getUnfilteredMessageCount(), 390);
+ for (int i = 0; i < 390; i++) {
+ StreamMessage streamMessage = messageBatch.getStreamMessage(i);
+ assertEquals(new String((byte[]) streamMessage.getValue()),
"sample_msg_" + (610 + i));
+ StreamMessageMetadata metadata = streamMessage.getMetadata();
+ assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + 610 + i);
+ StreamPartitionMsgOffset offset = metadata.getOffset();
+ assertTrue(offset instanceof LongMsgOffset);
+ assertEquals(((LongMsgOffset) offset).getOffset(), 610 + i);
+ StreamPartitionMsgOffset nextOffset = metadata.getNextOffset();
+ assertTrue(nextOffset instanceof LongMsgOffset);
+ assertEquals(((LongMsgOffset) nextOffset).getOffset(), 611 + i);
+ }
+ assertEquals(messageBatch.getOffsetOfNextBatch().toString(), "1000");
+ assertEquals(messageBatch.getFirstMessageOffset().toString(), "610");
+
assertEquals(messageBatch.getLastMessageMetadata().getOffset().toString(),
"999");
+
assertEquals(messageBatch.getLastMessageMetadata().getNextOffset().toString(),
"1000");
+ }
+ }
+
+ protected String getKafkaConsumerFactoryName() {
+ return KafkaConsumerFactory.class.getName();
+ }
+
+ @Test
+ public void testOffsetsExpired()
+ throws TimeoutException {
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", "kafka");
+ streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_3);
+ streamConfigMap.put("stream.kafka.broker.list", _kafkaBrokerAddress);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ streamConfigMap.put("auto.offset.reset", "earliest");
+ StreamConfig streamConfig = new StreamConfig("tableName_REALTIME",
streamConfigMap);
+
+ StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ PartitionGroupConsumer consumer =
streamConsumerFactory.createPartitionGroupConsumer("clientId",
+ new PartitionGroupConsumptionStatus(0, 0, new LongMsgOffset(0),
+ new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), "CONSUMING"));
+
+ // Start offset has expired. Automatically reset to earliest available and
fetch whatever available
+ MessageBatch messageBatch = consumer.fetchMessages(new LongMsgOffset(0),
10000);
+ assertEquals(messageBatch.getMessageCount(), 500);
+ assertEquals(messageBatch.getUnfilteredMessageCount(), 500);
+ for (int i = 0; i < 500; i++) {
+ assertEquals(new String((byte[])
messageBatch.getStreamMessage(i).getValue()), "sample_msg_" + (200 + i));
+ }
+ assertEquals(messageBatch.getOffsetOfNextBatch().toString(), "700");
+ }
+
+ @Test
+ public void testListTopics() {
+ String streamType = "kafka";
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
+ String clientId = "clientId";
+ String tableNameWithType = "tableName_REALTIME";
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", "NON_EXISTING_TOPIC");
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ StreamConfig streamConfig = new StreamConfig(tableNameWithType,
streamConfigMap);
+
+ KafkaStreamMetadataProvider streamMetadataProvider = new
KafkaStreamMetadataProvider(clientId, streamConfig);
+ List<StreamMetadataProvider.TopicMetadata> topics =
streamMetadataProvider.getTopics();
+ List<String> topicNames = topics.stream()
+ .map(StreamMetadataProvider.TopicMetadata::getName)
+ .collect(Collectors.toList());
+ assertTrue(topicNames.containsAll(List.of(TEST_TOPIC_1, TEST_TOPIC_2,
TEST_TOPIC_3)));
+ }
+
+ @Test
+ void testBatchSizeInBytesIsCalculatedCorrectly() {
+ TopicPartition topicPartition = new TopicPartition("test-topic", 0);
+
+ class FakeKafkaPartitionLevelConsumer extends KafkaPartitionLevelConsumer {
+
+ public FakeKafkaPartitionLevelConsumer(String clientId, StreamConfig
streamConfig, int partition) {
+ super(clientId, streamConfig, partition);
+ }
+
+ @Override
+ protected Consumer<String, Bytes> createConsumer(Properties
consumerProp) {
+ Consumer<String, Bytes> mockConsumer = mock(Consumer.class);
+ // Mock records using Kafka 4.x compatible constructor (headers cannot
be null in Kafka 4.x)
+ ConsumerRecord<String, Bytes> record1 =
+ new ConsumerRecord<>("test-topic", 0, 0L, NO_TIMESTAMP,
TimestampType.NO_TIMESTAMP_TYPE, 4,
+ 5, "key1", new
Bytes("value1".getBytes(StandardCharsets.UTF_8)), new RecordHeaders(), null);
+ ConsumerRecord<String, Bytes> record2 =
+ new ConsumerRecord<>("test-topic", 0, 0L, NO_TIMESTAMP,
TimestampType.NO_TIMESTAMP_TYPE, 4,
+ 9, "key2", new
Bytes("value2".getBytes(StandardCharsets.UTF_8)), new RecordHeaders(), null);
+ ConsumerRecord<String, Bytes> record3 =
+ new ConsumerRecord<>("test-topic", 0, 0L, NO_TIMESTAMP,
TimestampType.NO_TIMESTAMP_TYPE, 4,
+ -1, "key2", new
Bytes("value2".getBytes(StandardCharsets.UTF_8)), new RecordHeaders(), null);
+ // Mock return of poll()
+ ConsumerRecords<String, Bytes> consumerRecords = new ConsumerRecords<>(
+ Map.of(topicPartition, List.of(record1, record2, record3))
+ );
+
when(mockConsumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+ return mockConsumer;
+ }
+ }
+
+ FakeKafkaPartitionLevelConsumer kafkaSimpleStreamConsumer =
+ new FakeKafkaPartitionLevelConsumer("clientId-test",
getStreamConfig("test-topic"), 0);
+ KafkaMessageBatch kafkaMessageBatch =
kafkaSimpleStreamConsumer.fetchMessages(new LongMsgOffset(12345L), 10000);
+ Assert.assertEquals(kafkaMessageBatch.getSizeInBytes(), 14);
+ }
+
+ @Test
+ public void testFetchLatestStreamOffset()
+ throws IOException {
+ StreamConfig streamConfig = getStreamConfig(TEST_TOPIC_2);
+ try (KafkaStreamMetadataProvider streamMetadataProvider = new
KafkaStreamMetadataProvider("clientId",
+ streamConfig)) {
+ Set<Integer> partitions = new HashSet<>();
+ partitions.add(0);
+ partitions.add(1);
+ Map<Integer, StreamPartitionMsgOffset> partitionMsgOffsetMap =
+ streamMetadataProvider.fetchLatestStreamOffset(partitions, 1000);
+ Assert.assertEquals(((LongMsgOffset)
(partitionMsgOffsetMap.get(0))).getOffset(), NUM_MSG_PRODUCED_PER_PARTITION);
+ Assert.assertEquals(((LongMsgOffset)
(partitionMsgOffsetMap.get(1))).getOffset(), NUM_MSG_PRODUCED_PER_PARTITION);
+ }
+ }
+
+ private KafkaPartitionLevelConsumer createConsumer(String clientId,
StreamConfig streamConfig, int partition) {
+ if (RANDOM.nextDouble() < 0.5) {
+ return new KafkaPartitionLevelConsumer(clientId, streamConfig,
partition);
+ } else {
+ return new KafkaPartitionLevelConsumer(clientId, streamConfig, partition,
+ new ExponentialBackoffRetryPolicy(2, 1000, 1.1));
+ }
+ }
+
+ private StreamConfig getStreamConfig(String topicName) {
+ String streamType = "kafka";
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
+ String tableNameWithType = "tableName_REALTIME";
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", topicName);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+
+ return new StreamConfig(tableNameWithType, streamConfigMap);
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelStreamConfigTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelStreamConfigTest.java
new file mode 100644
index 00000000000..586b8e0bc91
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelStreamConfigTest.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.plugin.stream.kafka.KafkaPartitionLevelStreamConfig;
+import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class KafkaPartitionLevelStreamConfigTest {
+ private static final String KAFKA_DECODER_CLASS_NAME =
+ "org.apache.pinot.plugin.inputformat.avro.KafkaAvroMessageDecoder";
+
+ private KafkaPartitionLevelStreamConfig getStreamConfig(String topic, String
bootstrapHosts, String buffer,
+ String socketTimeout) {
+ return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, null,
null, null);
+ }
+
+ private KafkaPartitionLevelStreamConfig getStreamConfig(String topic, String
bootstrapHosts, String buffer,
+ String socketTimeout, String isolationLevel) {
+ return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, null,
null, isolationLevel);
+ }
+
+ private KafkaPartitionLevelStreamConfig getStreamConfig(String topic, String
bootstrapHosts, String buffer,
+ String socketTimeout, String fetcherSize, String fetcherMinBytes, String
isolationLevel) {
+ return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout,
fetcherSize, fetcherMinBytes, isolationLevel,
+ null);
+ }
+
+ private KafkaPartitionLevelStreamConfig getStreamConfig(String topic, String
bootstrapHosts, String buffer,
+ String socketTimeout, String fetcherSize, String fetcherMinBytes, String
isolationLevel,
+ String populateRowMetadata) {
+ return new KafkaPartitionLevelStreamConfig(
+ getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout,
fetcherSize, fetcherMinBytes, isolationLevel,
+ populateRowMetadata, "tableName_REALTIME"));
+ }
+
+ private StreamConfig getStreamConfig(String topic, String bootstrapHosts,
String buffer, String socketTimeout,
+ String fetcherSize, String fetcherMinBytes, String isolationLevel,
String populateRowMetadata,
+ String tableNameWithType) {
+ Map<String, String> streamConfigMap = new HashMap<>();
+ String streamType = "kafka";
+ String consumerFactoryClassName = KafkaConsumerFactory.class.getName();
+ streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
consumerFactoryClassName);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_DECODER_CLASS),
+ KAFKA_DECODER_CLASS_NAME);
+ streamConfigMap.put("stream.kafka.broker.list", bootstrapHosts);
+ if (buffer != null) {
+ streamConfigMap.put("stream.kafka.buffer.size", buffer);
+ }
+ if (socketTimeout != null) {
+ streamConfigMap.put("stream.kafka.socket.timeout", socketTimeout);
+ }
+ if (fetcherSize != null) {
+ streamConfigMap.put("stream.kafka.fetcher.size", fetcherSize);
+ }
+ if (fetcherMinBytes != null) {
+ streamConfigMap.put("stream.kafka.fetcher.minBytes", fetcherMinBytes);
+ }
+ if (isolationLevel != null) {
+ streamConfigMap.put("stream.kafka.isolation.level", isolationLevel);
+ }
+ if (populateRowMetadata != null) {
+ streamConfigMap.put("stream.kafka.metadata.populate",
populateRowMetadata);
+ }
+ return new StreamConfig(tableNameWithType, streamConfigMap);
+ }
+
+ @Test
+ public void testGetKafkaIsolationLevel() {
+ KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "", "",
"", "read_committed");
+ Assert.assertEquals("read_committed", config.getKafkaIsolationLevel());
+ }
+
+ @Test
+ public void testGetKafkaTopicName() {
+ KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "", "",
"");
+ Assert.assertEquals("topic", config.getKafkaTopicName());
+ }
+
+ @Test
+ public void testGetBootstrapHosts() {
+ KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1",
"", "");
+ Assert.assertEquals("host1", config.getBootstrapHosts());
+ }
+
+ @Test
+ public void testGetKafkaBufferSize() {
+ // test default
+ KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1",
null, "");
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+ config.getKafkaBufferSize());
+
+ config = getStreamConfig("topic", "host1", "", "");
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+ config.getKafkaBufferSize());
+
+ config = getStreamConfig("topic", "host1", "bad value", "");
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+ config.getKafkaBufferSize());
+
+ // correct config
+ config = getStreamConfig("topic", "host1", "100", "");
+ Assert.assertEquals(100, config.getKafkaBufferSize());
+ }
+
+ @Test
+ public void testGetKafkaSocketTimeout() {
+ // test default
+ KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1",
"", null);
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+ config.getKafkaSocketTimeout());
+
+ config = getStreamConfig("topic", "host1", "", "");
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+ config.getKafkaSocketTimeout());
+
+ config = getStreamConfig("topic", "host1", "", "bad value");
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+ config.getKafkaSocketTimeout());
+
+ // correct config
+ config = getStreamConfig("topic", "host1", "", "100");
+ Assert.assertEquals(100, config.getKafkaSocketTimeout());
+ }
+
+ @Test
+ public void testGetFetcherSize() {
+ // test default
+ KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1",
"", "", "", null, null);
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+ config.getKafkaFetcherSizeBytes());
+
+ config = getStreamConfig("topic", "host1", "100", "", "", null, null);
+ Assert.assertEquals(100, config.getKafkaFetcherSizeBytes());
+
+ config = getStreamConfig("topic", "host1", "100", "", "bad value", null,
null);
+ Assert.assertEquals(100, config.getKafkaFetcherSizeBytes());
+
+ // correct config
+ config = getStreamConfig("topic", "host1", "100", "", "200", null, null);
+ Assert.assertEquals(200, config.getKafkaFetcherSizeBytes());
+ }
+
+ @Test
+ public void testGetFetcherMinBytes() {
+ // test default
+ KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1",
"", "", "", null, null);
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+ config.getKafkaFetcherMinBytes());
+
+ config = getStreamConfig("topic", "host1", "", "", "", "", null);
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+ config.getKafkaFetcherMinBytes());
+
+ config = getStreamConfig("topic", "host1", "", "", "", "bad value", null);
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+ config.getKafkaFetcherMinBytes());
+
+ // correct config
+ config = getStreamConfig("topic", "host1", "", "", "", "100", null);
+ Assert.assertEquals(100, config.getKafkaFetcherMinBytes());
+ }
+
+ @Test
+ public void testIsPopulateRowMetadata() {
+ // test default
+ KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1",
null, null, null, null, null, null);
+ Assert.assertFalse(config.isPopulateMetadata());
+
+ config = getStreamConfig("topic", "host1", null, null, null, null, null,
"bad value");
+ Assert.assertFalse(config.isPopulateMetadata());
+
+ config = getStreamConfig("topic", "host1", null, null, null, null, null,
"TrUe");
+ Assert.assertTrue(config.isPopulateMetadata());
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/utils/MiniKafkaCluster.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/utils/MiniKafkaCluster.java
new file mode 100644
index 00000000000..03535d2ee81
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/utils/MiniKafkaCluster.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40.utils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.testcontainers.kafka.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+
+/**
+ * MiniKafkaCluster for Kafka 4.x using Testcontainers with KRaft mode (no
ZooKeeper).
+ * Uses the apache/kafka-native image for fast startup.
+ */
+public final class MiniKafkaCluster implements Closeable {
+
+ private static final String KAFKA_IMAGE = "apache/kafka:4.0.0";
+
+ private final KafkaContainer _kafkaContainer;
+
+ public MiniKafkaCluster(String brokerId)
+ throws Exception {
+ // Use apache/kafka image which supports KRaft mode
+ _kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
+ }
+
+ public void start()
+ throws Exception {
+ _kafkaContainer.start();
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _kafkaContainer.stop();
+ }
+
+ public String getKafkaServerAddress() {
+ return _kafkaContainer.getBootstrapServers();
+ }
+
+ private AdminClient getOrCreateAdminClient() {
+ Properties kafkaClientConfig = new Properties();
+ kafkaClientConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
getKafkaServerAddress());
+ return AdminClient.create(kafkaClientConfig);
+ }
+
+ public void createTopic(String topicName, int numPartitions, int
replicationFactor)
+ throws ExecutionException, InterruptedException {
+ try (AdminClient adminClient = getOrCreateAdminClient()) {
+ NewTopic newTopic = new NewTopic(topicName, numPartitions, (short)
replicationFactor);
+ int retries = 5;
+ while (retries > 0) {
+ try {
+
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
+ return;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof
org.apache.kafka.common.errors.TimeoutException) {
+ retries--;
+ TimeUnit.SECONDS.sleep(1);
+ } else {
+ throw e;
+ }
+ }
+ }
+ throw new ExecutionException("Failed to create topic after retries",
null);
+ }
+ }
+
+ public void deleteTopic(String topicName)
+ throws ExecutionException, InterruptedException {
+ try (AdminClient adminClient = getOrCreateAdminClient()) {
+
adminClient.deleteTopics(Collections.singletonList(topicName)).all().get();
+ }
+ }
+
+ public void deleteRecordsBeforeOffset(String topicName, int partitionId,
long offset)
+ throws ExecutionException, InterruptedException {
+ try (AdminClient adminClient = getOrCreateAdminClient()) {
+ Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
+ recordsToDelete.put(new TopicPartition(topicName, partitionId),
RecordsToDelete.beforeOffset(offset));
+ // Wait for the deletion to complete
+ adminClient.deleteRecords(recordsToDelete).all().get();
+ }
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/resources/log4j2.xml
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/resources/log4j2.xml
new file mode 100644
index 00000000000..439331f9d7b
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/resources/log4j2.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<Configuration>
+ <Appenders>
+ <Console name="console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %p [%c{1}] [%t] %m%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Logger name="org.apache.pinot" level="warn" additivity="false">
+ <AppenderRef ref="console"/>
+ </Logger>
+ <Root level="error">
+ <AppenderRef ref="console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/pinot-plugins/pinot-stream-ingestion/pom.xml
b/pinot-plugins/pinot-stream-ingestion/pom.xml
index 58a62ebb789..87e9760d4c2 100644
--- a/pinot-plugins/pinot-stream-ingestion/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pom.xml
@@ -39,6 +39,7 @@
<module>pinot-kafka-base</module>
<module>pinot-kafka-2.0</module>
<module>pinot-kafka-3.0</module>
+ <module>pinot-kafka-4.0</module>
<module>pinot-kinesis</module>
<module>pinot-pulsar</module>
</modules>
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java
index 595c0638e08..37abe9b03f3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java
@@ -95,6 +95,7 @@ public class PluginManager {
put("org.apache.pinot.filesystem.LocalPinotFS",
"org.apache.pinot.spi.filesystem.LocalPinotFS");
// StreamConsumerFactory
+ // Old-style class names mapped to current plugin packages
put("org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory");
put("org.apache.pinot.core.realtime.impl.kafka3.KafkaConsumerFactory",
diff --git a/pom.xml b/pom.xml
index 5833c56d11a..8c5c8d81da6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -197,6 +197,7 @@
<spark3.version>3.5.8</spark3.version>
<kafka2.version>2.8.2</kafka2.version>
<kafka3.version>3.9.1</kafka3.version>
+ <kafka4.version>4.1.1</kafka4.version>
<confluent.version>7.9.5</confluent.version>
<pulsar.version>4.0.8</pulsar.version>
<flink.version>1.20.3</flink.version>
@@ -684,6 +685,11 @@
<artifactId>pinot-kafka-3.0</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-kafka-4.0</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-kinesis</artifactId>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]