This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a5eae57fc81 Add Kafka 4.x client support (#17633)
a5eae57fc81 is described below

commit a5eae57fc81fa7232e1199d45b9b19817b0e611c
Author: Arunkumar Saravanan <[email protected]>
AuthorDate: Fri Feb 6 12:47:42 2026 +0530

    Add Kafka 4.x client support (#17633)
---
 pinot-integration-tests/pom.xml                    |   5 +
 .../LLCRealtimeKafka4ClusterIntegrationTest.java   | 142 ++++++
 .../pinot-stream-ingestion/pinot-kafka-4.0/pom.xml |  84 ++++
 .../stream/kafka40/KafkaConsumerFactory.java       |  70 +++
 .../KafkaPartitionLevelConnectionHandler.java      | 234 +++++++++
 .../kafka40/KafkaPartitionLevelConsumer.java       | 136 ++++++
 .../kafka40/KafkaStreamMetadataProvider.java       | 285 +++++++++++
 .../SynchronizedKafkaStreamMetadataProvider.java   |  44 ++
 .../kafka40/KafkaAdminClientManagerTest.java       | 130 +++++
 .../KafkaPartitionLevelConnectionHandlerTest.java  | 103 ++++
 .../kafka40/KafkaPartitionLevelConsumerTest.java   | 534 +++++++++++++++++++++
 .../KafkaPartitionLevelStreamConfigTest.java       | 203 ++++++++
 .../stream/kafka40/utils/MiniKafkaCluster.java     | 113 +++++
 .../pinot-kafka-4.0/src/test/resources/log4j2.xml  |  36 ++
 pinot-plugins/pinot-stream-ingestion/pom.xml       |   1 +
 .../org/apache/pinot/spi/plugin/PluginManager.java |   1 +
 pom.xml                                            |   6 +
 17 files changed, 2127 insertions(+)

diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index c68d88c3cf6..7af0bfa0c06 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -253,6 +253,11 @@
       <artifactId>kafka</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-kafka-4.0</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>com.h2database</groupId>
       <artifactId>h2</artifactId>
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka4ClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka4ClusterIntegrationTest.java
new file mode 100644
index 00000000000..7b101d49d52
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka4ClusterIntegrationTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
+import org.apache.pinot.plugin.stream.kafka40.KafkaConsumerFactory;
+import org.apache.pinot.plugin.stream.kafka40.KafkaPartitionLevelConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Integration test for low-level Kafka 4.x consumer.
+ */
+public class LLCRealtimeKafka4ClusterIntegrationTest extends 
LLCRealtimeClusterIntegrationTest {
+
+  @Override
+  protected Map<String, String> getStreamConfigMap() {
+    Map<String, String> streamConfigMap = super.getStreamConfigMap();
+    streamConfigMap.put(StreamConfigProperties.constructStreamProperty(
+        streamConfigMap.get(StreamConfigProperties.STREAM_TYPE),
+        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), 
ExceptingKafka4ConsumerFactory.class.getName());
+    ExceptingKafka4ConsumerFactory.init(getHelixClusterName(), _helixAdmin, 
getTableName());
+    return streamConfigMap;
+  }
+
+  public static class ExceptingKafka4ConsumerFactory extends 
KafkaConsumerFactory {
+
+    public static final int PARTITION_FOR_EXCEPTIONS = 1; // Setting this to 
-1 disables all exceptions thrown.
+    public static final int SEQ_NUM_FOR_CREATE_EXCEPTION = 1;
+    public static final int SEQ_NUM_FOR_CONSUME_EXCEPTION = 3;
+
+    private static HelixAdmin _helixAdmin;
+    private static String _helixClusterName;
+    private static String _tableName;
+    public ExceptingKafka4ConsumerFactory() {
+      super();
+    }
+
+    public static void init(String helixClusterName, HelixAdmin helixAdmin, 
String tableName) {
+      _helixAdmin = helixAdmin;
+      _helixClusterName = helixClusterName;
+      _tableName = tableName;
+    }
+
+    @Override
+    public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
+        PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
+      /*
+       * The segment data manager is creating a consumer to consume rows into 
a segment.
+       * Check the partition and sequence number of the segment and decide 
whether it
+       * qualifies for:
+       * - Throwing exception during create OR
+       * - Throwing exception during consumption.
+       * Make sure that this still works if retries are added in 
RealtimeSegmentDataManager
+       */
+      int partition = partitionGroupConsumptionStatus.getPartitionGroupId();
+      boolean exceptionDuringConsume = false;
+      int seqNum = getSegmentSeqNum(partition);
+      if (partition == PARTITION_FOR_EXCEPTIONS) {
+        if (seqNum == SEQ_NUM_FOR_CREATE_EXCEPTION) {
+          throw new RuntimeException("TestException during consumer creation");
+        } else if (seqNum == SEQ_NUM_FOR_CONSUME_EXCEPTION) {
+          exceptionDuringConsume = true;
+        }
+      }
+      return new ExceptingKafka4Consumer(clientId, _streamConfig, partition, 
exceptionDuringConsume);
+    }
+
+    @Override
+    public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
+        PartitionGroupConsumptionStatus partitionGroupConsumptionStatus, 
RetryPolicy retryPolicy) {
+      return createPartitionGroupConsumer(clientId, 
partitionGroupConsumptionStatus);
+    }
+
+    private int getSegmentSeqNum(int partition) {
+      IdealState is = _helixAdmin.getResourceIdealState(_helixClusterName,
+          TableNameBuilder.REALTIME.tableNameWithType(_tableName));
+      AtomicInteger seqNum = new AtomicInteger(-1);
+      is.getPartitionSet().forEach(segmentNameStr -> {
+        if (LLCSegmentName.isLLCSegment(segmentNameStr)) {
+          if (is.getInstanceStateMap(segmentNameStr).values().contains(
+              CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING)) {
+            LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+            if (segmentName.getPartitionGroupId() == partition) {
+              seqNum.set(segmentName.getSequenceNumber());
+            }
+          }
+        }
+      });
+      assertTrue(seqNum.get() >= 0, "No consuming segment found in partition: 
" + partition);
+      return seqNum.get();
+    }
+
+    public static class ExceptingKafka4Consumer extends 
KafkaPartitionLevelConsumer {
+      private final boolean _exceptionDuringConsume;
+
+      public ExceptingKafka4Consumer(String clientId, StreamConfig 
streamConfig, int partition,
+          boolean exceptionDuringConsume) {
+        super(clientId, streamConfig, partition);
+        _exceptionDuringConsume = exceptionDuringConsume;
+      }
+
+      @Override
+      public KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset 
startOffset, int timeoutMs) {
+        if (_exceptionDuringConsume) {
+          throw new RuntimeException("TestException during consumption");
+        }
+        return super.fetchMessages(startOffset, timeoutMs);
+      }
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/pom.xml 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/pom.xml
new file mode 100644
index 00000000000..68df0ed23ba
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>pinot-stream-ingestion</artifactId>
+    <groupId>org.apache.pinot</groupId>
+    <version>1.5.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pinot-kafka-4.0</artifactId>
+  <name>Pinot Kafka 4.x</name>
+  <url>https://pinot.apache.org/</url>
+  <properties>
+    <pinot.root>${basedir}/../../..</pinot.root>
+    <shade.phase.prop>package</shade.phase.prop>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-kafka-base</artifactId>
+    </dependency>
+
+    <!-- Kafka 4.x client (pure Java, no Scala dependency) -->
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka4.version}</version>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-kafka-base</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <!-- Testcontainers for embedded Kafka testing with KRaft mode -->
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>kafka</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>pinot-fastdev</id>
+      <properties>
+        <shade.phase.prop>none</shade.phase.prop>
+      </properties>
+    </profile>
+  </profiles>
+</project>
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaConsumerFactory.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaConsumerFactory.java
new file mode 100644
index 00000000000..c1eec69aa5e
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaConsumerFactory.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import org.apache.pinot.plugin.stream.kafka.KafkaConfigBackwardCompatibleUtils;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+
+
+public class KafkaConsumerFactory extends StreamConsumerFactory {
+
+  @Override
+  protected void init(StreamConfig streamConfig) {
+    KafkaConfigBackwardCompatibleUtils.handleStreamConfig(streamConfig);
+    super.init(streamConfig);
+  }
+
+  @Override
+  public StreamMetadataProvider createPartitionMetadataProvider(String 
clientId, int partition) {
+    return new KafkaStreamMetadataProvider(clientId, _streamConfig, partition);
+  }
+
+  @Override
+  public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
+    return new KafkaStreamMetadataProvider(clientId, _streamConfig);
+  }
+
+  @Override
+  public StreamMetadataProvider createStreamMetadataProvider(String clientId, 
boolean concurrentAccessExpected) {
+    if (concurrentAccessExpected) {
+      return new SynchronizedKafkaStreamMetadataProvider(clientId, 
_streamConfig);
+    } else {
+      return createStreamMetadataProvider(clientId);
+    }
+  }
+
+  @Override
+  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
+      PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
+    return new KafkaPartitionLevelConsumer(clientId, _streamConfig,
+        partitionGroupConsumptionStatus.getStreamPartitionGroupId());
+  }
+
+  @Override
+  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
+      PartitionGroupConsumptionStatus partitionGroupConsumptionStatus, 
RetryPolicy retryPolicy) {
+    return new KafkaPartitionLevelConsumer(clientId, _streamConfig,
+        partitionGroupConsumptionStatus.getStreamPartitionGroupId(), 
retryPolicy);
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConnectionHandler.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConnectionHandler.java
new file mode 100644
index 00000000000..36c24f43771
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConnectionHandler.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.plugin.stream.kafka.KafkaAdminClientManager;
+import org.apache.pinot.plugin.stream.kafka.KafkaPartitionLevelStreamConfig;
+import org.apache.pinot.plugin.stream.kafka.KafkaSSLUtils;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * KafkaPartitionLevelConnectionHandler provides low level APIs to access 
Kafka partition level information.
+ * E.g. partition counts, offsets per partition.
+ *
+ */
+public abstract class KafkaPartitionLevelConnectionHandler {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaPartitionLevelConnectionHandler.class);
+  private static final Set<String> CONSUMER_CONFIG_NAMES = 
ConsumerConfig.configNames();
+  private static final Set<String> ADMIN_CLIENT_CONFIG_NAMES = 
AdminClientConfig.configNames();
+  protected final KafkaPartitionLevelStreamConfig _config;
+  protected final String _clientId;
+  protected final int _partition;
+  protected final String _topic;
+  protected final Consumer<String, Bytes> _consumer;
+  protected final TopicPartition _topicPartition;
+  protected final Properties _consumerProp;
+  protected volatile KafkaAdminClientManager.AdminClientReference 
_sharedAdminClientRef;
+
+  public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig 
streamConfig, int partition) {
+    this(clientId, streamConfig, partition, null);
+  }
+
+  public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig 
streamConfig, int partition,
+      @Nullable RetryPolicy retryPolicy) {
+    _config = new KafkaPartitionLevelStreamConfig(streamConfig);
+    _clientId = clientId;
+    _partition = partition;
+    _topic = _config.getKafkaTopicName();
+    _consumerProp = buildProperties(streamConfig);
+    KafkaSSLUtils.initSSL(_consumerProp);
+    if (retryPolicy == null) {
+      _consumer = createConsumer(_consumerProp);
+    } else {
+      _consumer = createConsumer(_consumerProp, retryPolicy);
+    }
+    _topicPartition = new TopicPartition(_topic, _partition);
+    _consumer.assign(Collections.singletonList(_topicPartition));
+  }
+
+  private Properties buildProperties(StreamConfig streamConfig) {
+    Properties consumerProp = new Properties();
+    consumerProp.putAll(streamConfig.getStreamConfigsMap());
+    consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
_config.getBootstrapHosts());
+    consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+    consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
BytesDeserializer.class.getName());
+    if (_config.getKafkaIsolationLevel() != null) {
+      consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
_config.getKafkaIsolationLevel());
+    }
+    consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
+    return consumerProp;
+  }
+
+  /**
+   * Filter properties to only include the specified Kafka configurations.
+   * This prevents "was supplied but isn't a known config" warnings from Kafka 
clients.
+   *
+   * @param props The properties to filter
+   * @param validConfigNames The set of valid configuration names for the 
target Kafka client
+   * @return A new Properties object containing only the valid configurations
+   */
+  private Properties filterKafkaProperties(Properties props, Set<String> 
validConfigNames) {
+    Properties filteredProps = new Properties();
+    for (String key : props.stringPropertyNames()) {
+      if (validConfigNames.contains(key)) {
+        filteredProps.put(key, props.get(key));
+      }
+    }
+    return filteredProps;
+  }
+
+  private Consumer<String, Bytes> createConsumer(Properties consumerProp, 
RetryPolicy retryPolicy) {
+    AtomicReference<Consumer<String, Bytes>> consumer = new 
AtomicReference<>();
+    try {
+      retryPolicy.attempt(() -> {
+        try {
+          consumer.set(new KafkaConsumer<>(filterKafkaProperties(consumerProp, 
CONSUMER_CONFIG_NAMES)));
+          return true;
+        } catch (Exception e) {
+          LOGGER.warn("Caught exception while creating Kafka consumer, 
retrying.", e);
+          return false;
+        }
+      });
+    } catch (AttemptsExceededException | RetriableOperationException e) {
+      LOGGER.error("Caught exception while creating Kafka consumer, giving 
up", e);
+      throw new RuntimeException(e);
+    }
+    return consumer.get();
+  }
+
+  @VisibleForTesting
+  protected Consumer<String, Bytes> createConsumer(Properties consumerProp) {
+    return retry(() -> new KafkaConsumer<>(filterKafkaProperties(consumerProp, 
CONSUMER_CONFIG_NAMES)), 5);
+  }
+
+  protected AdminClient createAdminClient() {
+    return retry(() -> AdminClient.create(filterKafkaProperties(_consumerProp, 
ADMIN_CLIENT_CONFIG_NAMES)), 5);
+  }
+
+  /**
+   * Gets or creates a reusable admin client instance. The admin client is 
lazily initialized
+   * and reused across multiple calls to avoid the overhead of creating new 
connections.
+   *
+   * @return the admin client instance
+   */
+  @Deprecated
+  protected AdminClient getOrCreateAdminClient() {
+    return createAdminClient();
+  }
+
+  /**
+   * Gets or creates a shared admin client instance that can be reused across 
multiple
+   * connection handlers connecting to the same Kafka cluster. This provides 
better
+   * resource efficiency when multiple consumers/producers connect to the same 
bootstrap servers.
+   *
+   * @return the shared admin client instance
+   */
+  protected AdminClient getOrCreateSharedAdminClient() {
+    return getOrCreateSharedAdminClientInternal(false);
+  }
+
+  private AdminClient getOrCreateSharedAdminClientInternal(boolean isRetry) {
+    KafkaAdminClientManager.AdminClientReference ref = _sharedAdminClientRef;
+    if (ref == null) {
+      synchronized (this) {
+        ref = _sharedAdminClientRef;
+        if (ref == null) {
+          ref = 
KafkaAdminClientManager.getInstance().getOrCreateAdminClient(_consumerProp);
+          _sharedAdminClientRef = ref;
+        }
+      }
+    }
+    try {
+      return ref.getAdminClient();
+    } catch (IllegalStateException e) {
+      if (isRetry) {
+        throw new RuntimeException("Failed to create admin client after 
retry", e);
+      }
+      // Reference was closed, retry once
+      synchronized (this) {
+        _sharedAdminClientRef = null;
+      }
+      return getOrCreateSharedAdminClientInternal(true);
+    }
+  }
+
+  private static <T> T retry(Supplier<T> s, int nRetries) {
+    // Creation of the KafkaConsumer can fail for multiple reasons including 
DNS issues.
+    // We arbitrarily chose 5 retries with 2 seconds sleep in between retries. 
10 seconds total felt
+    // like a good balance of not waiting too long for a retry, but also not 
retrying too many times.
+    int tries = 0;
+    while (true) {
+      try {
+        return s.get();
+      } catch (KafkaException e) {
+        tries++;
+        if (tries >= nRetries) {
+          LOGGER.error("Caught exception while creating Kafka consumer, giving 
up", e);
+          throw e;
+        }
+        LOGGER.warn("Caught exception while creating Kafka consumer, retrying 
{}/{}", tries, nRetries, e);
+        // We are choosing to sleepUniterruptibly here because other parts of 
the Kafka consumer code do this
+        // as well. We don't want random interrupts to cause us to fail to 
create the consumer and have the table
+        // stuck in ERROR state.
+        Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+      }
+    }
+  }
+
+  public void close()
+      throws IOException {
+    _consumer.close();
+    if (_sharedAdminClientRef != null) {
+      _sharedAdminClientRef.close();
+      _sharedAdminClientRef = null;
+    }
+  }
+
+  @VisibleForTesting
+  public KafkaPartitionLevelStreamConfig getKafkaPartitionLevelStreamConfig() {
+    return _config;
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumer.java
new file mode 100644
index 00000000000..589736ef444
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumer.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
+import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.plugin.stream.kafka.KafkaStreamMessageMetadata;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.BytesStreamMessage;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHandler
+    implements PartitionGroupConsumer {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);
+
+  private long _lastFetchedOffset = -1;
+
+  public KafkaPartitionLevelConsumer(String clientId, StreamConfig 
streamConfig, int partition) {
+    super(clientId, streamConfig, partition);
+  }
+
+  public KafkaPartitionLevelConsumer(String clientId, StreamConfig 
streamConfig, int partition,
+      RetryPolicy retryPolicy) {
+    super(clientId, streamConfig, partition, retryPolicy);
+  }
+
+  @Override
+  public synchronized KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset 
startMsgOffset, int timeoutMs) {
+    long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Polling partition: {}, startOffset: {}, timeout: {}ms", 
_topicPartition, startOffset, timeoutMs);
+    }
+    if (_lastFetchedOffset < 0 || _lastFetchedOffset != startOffset - 1) {
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Seeking to offset: {}", startOffset);
+      }
+      _consumer.seek(_topicPartition, startOffset);
+    }
+
+    ConsumerRecords<String, Bytes> consumerRecords = 
_consumer.poll(Duration.ofMillis(timeoutMs));
+    List<ConsumerRecord<String, Bytes>> records = 
consumerRecords.records(_topicPartition);
+    List<BytesStreamMessage> filteredRecords = new ArrayList<>(records.size());
+    long firstOffset = -1;
+    long offsetOfNextBatch = startOffset;
+    StreamMessageMetadata lastMessageMetadata = null;
+    long batchSizeInBytes = 0;
+    if (!records.isEmpty()) {
+      firstOffset = records.get(0).offset();
+      _lastFetchedOffset = records.get(records.size() - 1).offset();
+      offsetOfNextBatch = _lastFetchedOffset + 1;
+      for (ConsumerRecord<String, Bytes> record : records) {
+        StreamMessageMetadata messageMetadata = extractMessageMetadata(record);
+        Bytes message = record.value();
+        if (message != null) {
+          String key = record.key();
+          byte[] keyBytes = key != null ? key.getBytes(StandardCharsets.UTF_8) 
: null;
+          filteredRecords.add(new BytesStreamMessage(keyBytes, message.get(), 
messageMetadata));
+          if (messageMetadata.getRecordSerializedSize() > 0) {
+            batchSizeInBytes += messageMetadata.getRecordSerializedSize();
+          }
+        } else if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("Tombstone message at offset: {}", record.offset());
+        }
+        lastMessageMetadata = messageMetadata;
+      }
+    }
+
+    // In case read_committed is enabled, the messages consumed are not 
guaranteed to have consecutive offsets.
+    // TODO: A better solution would be to fetch earliest offset from topic 
and see if it is greater than startOffset.
+    // However, this would require and additional call to Kafka which we want 
to avoid.
+    boolean hasDataLoss = false;
+    if (_config.getKafkaIsolationLevel() == null || 
_config.getKafkaIsolationLevel()
+        
.equals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED))
 {
+      hasDataLoss = firstOffset > startOffset;
+    }
+    return new KafkaMessageBatch(filteredRecords, records.size(), 
offsetOfNextBatch, firstOffset, lastMessageMetadata,
+        hasDataLoss, batchSizeInBytes);
+  }
+
+  private StreamMessageMetadata extractMessageMetadata(ConsumerRecord<String, 
Bytes> record) {
+    long timestamp = record.timestamp();
+    long offset = record.offset();
+
+    StreamMessageMetadata.Builder builder = new 
StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp)
+        .setOffset(new LongMsgOffset(offset), new LongMsgOffset(offset + 1))
+        .setSerializedValueSize(record.serializedValueSize());
+    if (_config.isPopulateMetadata()) {
+      Headers headers = record.headers();
+      if (headers != null) {
+        GenericRow headerGenericRow = new GenericRow();
+        for (Header header : headers.toArray()) {
+          headerGenericRow.putValue(header.key(), header.value());
+        }
+        builder.setHeaders(headerGenericRow);
+      }
+      
builder.setMetadata(Map.of(KafkaStreamMessageMetadata.RECORD_TIMESTAMP_KEY, 
String.valueOf(timestamp),
+          KafkaStreamMessageMetadata.METADATA_OFFSET_KEY, 
String.valueOf(offset),
+          KafkaStreamMessageMetadata.METADATA_PARTITION_KEY, 
String.valueOf(record.partition())));
+    }
+    return builder.build();
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
new file mode 100644
index 00000000000..032fbf66fe2
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.pinot.plugin.stream.kafka.KafkaConsumerPartitionLag;
+import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.TransientConsumerException;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHandler
+    implements StreamMetadataProvider {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaStreamMetadataProvider.class);
+
+  public KafkaStreamMetadataProvider(String clientId, StreamConfig 
streamConfig) {
+    this(clientId, streamConfig, Integer.MIN_VALUE);
+  }
+
+  public KafkaStreamMetadataProvider(String clientId, StreamConfig 
streamConfig, int partition) {
+    super(clientId, streamConfig, partition);
+  }
+
+  @Override
+  public int fetchPartitionCount(long timeoutMillis) {
+    try {
+      List<PartitionInfo> partitionInfos = _consumer.partitionsFor(_topic, 
Duration.ofMillis(timeoutMillis));
+      if (CollectionUtils.isNotEmpty(partitionInfos)) {
+        return partitionInfos.size();
+      }
+      throw new RuntimeException(String.format("Failed to fetch partition 
information for topic: %s", _topic));
+    } catch (TimeoutException e) {
+      throw new TransientConsumerException(e);
+    }
+  }
+
+  @Override
+  public Set<Integer> fetchPartitionIds(long timeoutMillis) {
+    try {
+      List<PartitionInfo> partitionInfos = _consumer.partitionsFor(_topic, 
Duration.ofMillis(timeoutMillis));
+      if (CollectionUtils.isEmpty(partitionInfos)) {
+        throw new RuntimeException(String.format("Failed to fetch partition 
information for topic: %s", _topic));
+      }
+      Set<Integer> partitionIds = 
Sets.newHashSetWithExpectedSize(partitionInfos.size());
+      for (PartitionInfo partitionInfo : partitionInfos) {
+        partitionIds.add(partitionInfo.partition());
+      }
+      return partitionIds;
+    } catch (TimeoutException e) {
+      throw new TransientConsumerException(e);
+    }
+  }
+
+  @Override
+  public Map<Integer, StreamPartitionMsgOffset> 
fetchLatestStreamOffset(Set<Integer> partitionIds, long timeoutMillis) {
+    List<TopicPartition> topicPartitions = new 
ArrayList<>(partitionIds.size());
+    for (Integer streamPartition: partitionIds) {
+      topicPartitions.add(new TopicPartition(_topic, streamPartition));
+    }
+    try {
+      Map<TopicPartition, Long> topicPartitionToLatestOffsetMap =
+          _consumer.endOffsets(topicPartitions, 
Duration.ofMillis(timeoutMillis));
+
+      Map<Integer, StreamPartitionMsgOffset> partitionIdToLatestOffset =
+          new HashMap<>(topicPartitionToLatestOffsetMap.size());
+      for (Map.Entry<TopicPartition, Long> entry : 
topicPartitionToLatestOffsetMap.entrySet()) {
+        partitionIdToLatestOffset.put(entry.getKey().partition(), new 
LongMsgOffset(entry.getValue()));
+      }
+
+      return partitionIdToLatestOffset;
+    } catch (TimeoutException e) {
+      throw new TransientConsumerException(e);
+    }
+  }
+
+  @Override
+  public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
+    Preconditions.checkNotNull(offsetCriteria);
+    long offset;
+    try {
+      if (offsetCriteria.isLargest()) {
+        offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
+            .get(_topicPartition);
+      } else if (offsetCriteria.isSmallest()) {
+        offset =
+            
_consumer.beginningOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
+                .get(_topicPartition);
+      } else if (offsetCriteria.isPeriod()) {
+        OffsetAndTimestamp offsetAndTimestamp = 
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
+                Clock.systemUTC().millis() - 
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString())))
+            .get(_topicPartition);
+        if (offsetAndTimestamp == null) {
+          offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
+              .get(_topicPartition);
+          LOGGER.warn(
+              "initial offset type is period and its value evaluates to null 
hence proceeding with offset {} for "
+                  + "topic {} partition {}", offset, _topicPartition.topic(), 
_topicPartition.partition());
+        } else {
+          offset = offsetAndTimestamp.offset();
+        }
+      } else if (offsetCriteria.isTimestamp()) {
+        OffsetAndTimestamp offsetAndTimestamp = 
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
+            
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition);
+        if (offsetAndTimestamp == null) {
+          offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
+              .get(_topicPartition);
+          LOGGER.warn(
+              "initial offset type is timestamp and its value evaluates to 
null hence proceeding with offset {} for "
+                  + "topic {} partition {}", offset, _topicPartition.topic(), 
_topicPartition.partition());
+        } else {
+          offset = offsetAndTimestamp.offset();
+        }
+      } else {
+        throw new IllegalArgumentException("Unknown initial offset value " + 
offsetCriteria);
+      }
+      return new LongMsgOffset(offset);
+    } catch (TimeoutException e) {
+      throw new TransientConsumerException(e);
+    }
+  }
+
+  @Override
+  public Map<String, PartitionLagState> getCurrentPartitionLagState(
+      Map<String, ConsumerPartitionState> currentPartitionStateMap) {
+    Map<String, PartitionLagState> perPartitionLag = new HashMap<>();
+    for (Map.Entry<String, ConsumerPartitionState> entry : 
currentPartitionStateMap.entrySet()) {
+      ConsumerPartitionState partitionState = entry.getValue();
+      // Compute records-lag
+      StreamPartitionMsgOffset currentOffset = 
partitionState.getCurrentOffset();
+      StreamPartitionMsgOffset upstreamLatest = 
partitionState.getUpstreamLatestOffset();
+      String offsetLagString = "UNKNOWN";
+
+      if (currentOffset instanceof LongMsgOffset && upstreamLatest instanceof 
LongMsgOffset) {
+        long offsetLag = ((LongMsgOffset) upstreamLatest).getOffset() - 
((LongMsgOffset) currentOffset).getOffset();
+        offsetLagString = String.valueOf(offsetLag);
+      }
+
+      // Compute record-availability
+      String availabilityLagMs = "UNKNOWN";
+      StreamMessageMetadata lastProcessedMessageMetadata = 
partitionState.getLastProcessedRowMetadata();
+      if (lastProcessedMessageMetadata != null && 
partitionState.getLastProcessedTimeMs() > 0) {
+        long availabilityLag =
+            partitionState.getLastProcessedTimeMs() - 
lastProcessedMessageMetadata.getRecordIngestionTimeMs();
+        availabilityLagMs = String.valueOf(availabilityLag);
+      }
+
+      perPartitionLag.put(entry.getKey(), new 
KafkaConsumerPartitionLag(offsetLagString, availabilityLagMs));
+    }
+    return perPartitionLag;
+  }
+
+  @Override
+  public List<TopicMetadata> getTopics() {
+    try {
+      AdminClient adminClient = getOrCreateSharedAdminClient();
+      ListTopicsResult result = adminClient.listTopics();
+      if (result == null) {
+        return Collections.emptyList();
+      }
+      return result.names()
+          .get()
+          .stream()
+          .map(topic -> new KafkaTopicMetadata().setName(topic))
+          .collect(Collectors.toList());
+    } catch (ExecutionException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean supportsOffsetLag() {
+    return true;
+  }
+
+  public static class KafkaTopicMetadata implements TopicMetadata {
+    private String _name;
+
+    public String getName() {
+      return _name;
+    }
+
+    public KafkaTopicMetadata setName(String name) {
+      _name = name;
+      return this;
+    }
+  }
+
+
+
+  @Override
+  public StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long 
timestampMillis, long timeoutMillis) {
+    try {
+      OffsetAndTimestamp offsetAndTimestamp = 
_consumer.offsetsForTimes(Map.of(_topicPartition, timestampMillis),
+          Duration.ofMillis(timeoutMillis)).get(_topicPartition);
+      if (offsetAndTimestamp == null) {
+        return null;
+      }
+      return new LongMsgOffset(offsetAndTimestamp.offset());
+    } catch (Exception e) {
+      LOGGER.warn("Failed to get offset at timestamp {} for partition {}", 
timestampMillis, partitionId, e);
+      return null;
+    }
+  }
+
+  @Override
+  public Map<String, StreamPartitionMsgOffset> getStreamStartOffsets() {
+    List<PartitionInfo> partitionInfos = _consumer.partitionsFor(_topic);
+    Map<TopicPartition, Long> startOffsets = _consumer.beginningOffsets(
+        partitionInfos.stream()
+            .map(info -> new TopicPartition(_topic, info.partition()))
+            .collect(Collectors.toList()));
+    return startOffsets.entrySet().stream().collect(
+        Collectors.toMap(
+            entry -> String.valueOf(entry.getKey().partition()),
+            entry -> new LongMsgOffset(entry.getValue()),
+            (existingValue, newValue) -> newValue
+        ));
+  }
+
+  @Override
+  public Map<String, StreamPartitionMsgOffset> getStreamEndOffsets() {
+    List<PartitionInfo> partitionInfos = _consumer.partitionsFor(_topic);
+    Map<TopicPartition, Long> endOffsets = _consumer.endOffsets(
+        partitionInfos.stream()
+            .map(info -> new TopicPartition(_topic, info.partition()))
+            .collect(Collectors.toList()));
+    return endOffsets.entrySet().stream().collect(
+        Collectors.toMap(
+            entry -> String.valueOf(entry.getKey().partition()),
+            entry -> new LongMsgOffset(entry.getValue()),
+            (existingValue, newValue) -> newValue
+        ));
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    super.close();
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/SynchronizedKafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/SynchronizedKafkaStreamMetadataProvider.java
new file mode 100644
index 00000000000..9733b0e4add
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/SynchronizedKafkaStreamMetadataProvider.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+/**
+ * A thread-safe variant of {@link KafkaStreamMetadataProvider} that 
synchronizes
+ * access to metadata fetch operations.
+ * This is particularly useful when a shared instance of {@link 
KafkaStreamMetadataProvider}
+ * is accessed concurrently by multiple threads.
+ */
+public class SynchronizedKafkaStreamMetadataProvider extends 
KafkaStreamMetadataProvider {
+
+  public SynchronizedKafkaStreamMetadataProvider(String clientId, StreamConfig 
streamConfig) {
+    super(clientId, streamConfig);
+  }
+
+  @Override
+  public Map<Integer, StreamPartitionMsgOffset> 
fetchLatestStreamOffset(Set<Integer> partitionIds, long timeoutMillis) {
+    synchronized (this) {
+      return super.fetchLatestStreamOffset(partitionIds, timeoutMillis);
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaAdminClientManagerTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaAdminClientManagerTest.java
new file mode 100644
index 00000000000..89ee1b8857a
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaAdminClientManagerTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import java.util.Properties;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.pinot.plugin.stream.kafka.KafkaAdminClientManager;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class KafkaAdminClientManagerTest {
+
+  @Test
+  public void testGetInstance() {
+    KafkaAdminClientManager instance1 = KafkaAdminClientManager.getInstance();
+    KafkaAdminClientManager instance2 = KafkaAdminClientManager.getInstance();
+    assertSame(instance1, instance2, "Should return the same singleton 
instance");
+  }
+
+  @Test
+  public void testCreateCacheKey() {
+    Properties props1 = new Properties();
+    props1.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
+    props1.setProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, 
"PLAINTEXT");
+
+    Properties props2 = new Properties();
+    props2.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
+    props2.setProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, 
"PLAINTEXT");
+
+    Properties props3 = new Properties();
+    props3.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9093");
+    props3.setProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, 
"PLAINTEXT");
+
+    KafkaAdminClientManager manager = KafkaAdminClientManager.getInstance();
+
+    // Use reflection to access private method for testing
+    try {
+      java.lang.reflect.Method createCacheKeyMethod =
+          KafkaAdminClientManager.class.getDeclaredMethod("createCacheKey", 
Properties.class);
+      createCacheKeyMethod.setAccessible(true);
+
+      String key1 = (String) createCacheKeyMethod.invoke(manager, props1);
+      String key2 = (String) createCacheKeyMethod.invoke(manager, props2);
+      String key3 = (String) createCacheKeyMethod.invoke(manager, props3);
+
+      assertEquals(key1, key2, "Same properties should produce same cache 
key");
+      assertNotEquals(key1, key3, "Different bootstrap servers should produce 
different cache keys");
+    } catch (Exception e) {
+      fail("Failed to test createCacheKey method", e);
+    }
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testGetOrCreateAdminClientWithoutBootstrapServers() {
+    Properties props = new Properties();
+    // No bootstrap servers set
+    KafkaAdminClientManager.getInstance().getOrCreateAdminClient(props);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testGetOrCreateAdminClientWithEmptyBootstrapServers() {
+    Properties props = new Properties();
+    props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+    KafkaAdminClientManager.getInstance().getOrCreateAdminClient(props);
+  }
+
+  @Test
+  public void testAdminClientReferenceClosing() {
+    Properties props = new Properties();
+    props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
+
+    KafkaAdminClientManager manager = KafkaAdminClientManager.getInstance();
+
+    // This test focuses on the reference mechanism without actually 
connecting to Kafka
+    // We test that multiple calls with same properties don't throw exceptions
+    try {
+      KafkaAdminClientManager.AdminClientReference ref1 = 
manager.getOrCreateAdminClient(props);
+      assertNotNull(ref1, "Reference should not be null");
+
+      // Closing a reference should not throw an exception
+      ref1.close();
+
+      // Closing again should not throw an exception
+      ref1.close();
+    } catch (Exception e) {
+      // This is expected since we're not connecting to a real Kafka cluster
+      // The test is mainly to verify the reference counting mechanism doesn't 
crash
+      assertTrue(e.getMessage().contains("Connection") || 
e.getMessage().contains("Kafka")
+          || e.getMessage().contains("timeout") || 
e.getMessage().contains("refused"));
+    }
+  }
+
+  @Test
+  public void testAdminClientReferenceStateAfterClose() {
+    Properties props = new Properties();
+    props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
+
+    KafkaAdminClientManager manager = KafkaAdminClientManager.getInstance();
+
+    try {
+      KafkaAdminClientManager.AdminClientReference ref = 
manager.getOrCreateAdminClient(props);
+      ref.close();
+
+      // Should throw IllegalStateException when trying to use closed reference
+      assertThrows(IllegalStateException.class, () -> ref.getAdminClient());
+    } catch (Exception e) {
+      // Expected when no real Kafka cluster is available
+      assertTrue(e.getMessage().contains("Connection") || 
e.getMessage().contains("Kafka")
+          || e.getMessage().contains("timeout") || 
e.getMessage().contains("refused"));
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConnectionHandlerTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConnectionHandlerTest.java
new file mode 100644
index 00000000000..a7a2ac9ee7a
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConnectionHandlerTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+
+public class KafkaPartitionLevelConnectionHandlerTest {
+
+  private static class TestableKafkaPartitionLevelConnectionHandler extends 
KafkaPartitionLevelConnectionHandler {
+    public TestableKafkaPartitionLevelConnectionHandler(String clientId, 
StreamConfig streamConfig, int partition) {
+      super(clientId, streamConfig, partition);
+    }
+  }
+
+  private StreamConfig createTestStreamConfig() {
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", "kafka");
+    streamConfigMap.put("stream.kafka.topic.name", "testTopic");
+    streamConfigMap.put("stream.kafka.broker.list", "localhost:9092");
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
KafkaConsumerFactory.class.getName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    return new StreamConfig("testTable_REALTIME", streamConfigMap);
+  }
+
+  @Test
+  public void testSharedAdminClientReference() {
+    StreamConfig streamConfig = createTestStreamConfig();
+
+    try {
+      TestableKafkaPartitionLevelConnectionHandler handler =
+          new TestableKafkaPartitionLevelConnectionHandler("testClient", 
streamConfig, 0);
+
+      // Test that we can call getOrCreateSharedAdminClient multiple times
+      // without throwing exceptions (even though it may fail to connect)
+      try {
+        handler.getOrCreateSharedAdminClient();
+        handler.getOrCreateSharedAdminClient(); // Should reuse the same 
reference
+      } catch (Exception e) {
+        // Expected when no real Kafka cluster is available
+        assertTrue(e.getMessage().contains("Connection") || 
e.getMessage().contains("Kafka")
+            || e.getMessage().contains("timeout") || 
e.getMessage().contains("refused")
+            || e.getCause() != null);
+      }
+
+      // Test that close doesn't throw exceptions
+      handler.close();
+    } catch (Exception e) {
+      // Expected when initializing without a real Kafka cluster
+      assertTrue(e.getMessage().contains("Connection") || 
e.getMessage().contains("Kafka")
+          || e.getMessage().contains("timeout") || 
e.getMessage().contains("refused")
+          || e.getCause() != null);
+    }
+  }
+
+  @Test
+  public void testGetOrCreateAdminClientBackwardCompatibility() {
+    StreamConfig streamConfig = createTestStreamConfig();
+
+    try {
+      TestableKafkaPartitionLevelConnectionHandler handler =
+          new TestableKafkaPartitionLevelConnectionHandler("testClient", 
streamConfig, 0);
+
+      // Test that the backward compatibility method still works
+      try {
+        handler.getOrCreateAdminClient();
+      } catch (Exception e) {
+        // Expected when no real Kafka cluster is available
+        assertTrue(e.getMessage().contains("Connection") || 
e.getMessage().contains("Kafka")
+            || e.getMessage().contains("timeout") || 
e.getMessage().contains("refused")
+            || e.getCause() != null);
+      }
+
+      handler.close();
+    } catch (Exception e) {
+      // Expected when initializing without a real Kafka cluster
+      assertTrue(e.getMessage().contains("Connection") || 
e.getMessage().contains("Kafka")
+          || e.getMessage().contains("timeout") || 
e.getMessage().contains("refused")
+          || e.getCause() != null);
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumerTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumerTest.java
new file mode 100644
index 00000000000..f602d083963
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumerTest.java
@@ -0,0 +1,534 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
+import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.plugin.stream.kafka40.utils.MiniKafkaCluster;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.utils.retry.ExponentialBackoffRetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.DockerClientFactory;
+import org.testng.Assert;
+import org.testng.SkipException;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Tests for the KafkaPartitionLevelConsumer.
+ * Note: These tests require Docker to be running as they use Testcontainers.
+ */
+public class KafkaPartitionLevelConsumerTest {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaPartitionLevelConsumerTest.class);
+  private static final long STABILIZE_SLEEP_DELAYS = 3000;
+  private static final String TEST_TOPIC_1 = "foo";
+  private static final String TEST_TOPIC_2 = "bar";
+  private static final String TEST_TOPIC_3 = "expired";
+  private static final int NUM_MSG_PRODUCED_PER_PARTITION = 1000;
+  private static final long TIMESTAMP = Instant.now().toEpochMilli();
+  private static final Random RANDOM = new Random();
+
+  private MiniKafkaCluster _kafkaCluster;
+  private String _kafkaBrokerAddress;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    // Check if Docker is available, skip tests if not
+    if (!isDockerAvailable()) {
+      throw new SkipException("Docker is not available. Skipping Kafka 4.0 
consumer tests. "
+          + "These tests require Docker for Testcontainers.");
+    }
+    _kafkaCluster = new MiniKafkaCluster("0");
+    _kafkaCluster.start();
+    _kafkaBrokerAddress = _kafkaCluster.getKafkaServerAddress();
+    _kafkaCluster.createTopic(TEST_TOPIC_1, 1, 1);
+    _kafkaCluster.createTopic(TEST_TOPIC_2, 2, 1);
+    _kafkaCluster.createTopic(TEST_TOPIC_3, 1, 1);
+    Thread.sleep(STABILIZE_SLEEP_DELAYS);
+    produceMsgToKafka();
+    Thread.sleep(STABILIZE_SLEEP_DELAYS);
+    _kafkaCluster.deleteRecordsBeforeOffset(TEST_TOPIC_3, 0, 200);
+  }
+
+  /**
+   * Checks if Docker is available for running Testcontainers.
+   * @return true if Docker is available, false otherwise
+   */
+  private static boolean isDockerAvailable() {
+    try {
+      DockerClientFactory.instance().client();
+      return true;
+    } catch (Throwable ex) {
+      LOGGER.warn("Docker is not available: {}", ex.getMessage());
+      return false;
+    }
+  }
+
+  private void produceMsgToKafka() {
+    Properties props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, _kafkaBrokerAddress);
+    props.put(ProducerConfig.CLIENT_ID_CONFIG, "clientId");
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+    try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
+      for (int i = 0; i < NUM_MSG_PRODUCED_PER_PARTITION; i++) {
+        producer.send(new ProducerRecord<>(TEST_TOPIC_1, 0, TIMESTAMP + i, 
null, "sample_msg_" + i));
+        // TEST_TOPIC_2 has 2 partitions
+        producer.send(new ProducerRecord<>(TEST_TOPIC_2, 0, TIMESTAMP + i, 
null, "sample_msg_" + i));
+        producer.send(new ProducerRecord<>(TEST_TOPIC_2, 1, TIMESTAMP + i, 
null, "sample_msg_" + i));
+        producer.send(new ProducerRecord<>(TEST_TOPIC_3, "sample_msg_" + i));
+      }
+      producer.flush();
+    }
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    try {
+      _kafkaCluster.deleteTopic(TEST_TOPIC_1);
+      _kafkaCluster.deleteTopic(TEST_TOPIC_2);
+      _kafkaCluster.deleteTopic(TEST_TOPIC_3);
+    } finally {
+      _kafkaCluster.close();
+    }
+  }
+
+  @Test
+  public void testBuildConsumer() {
+    String streamType = "kafka";
+    String streamKafkaTopicName = "theTopic";
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
+    String clientId = "clientId";
+    String tableNameWithType = "tableName_REALTIME";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
getKafkaConsumerFactoryName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    streamConfigMap.put("stream.kafka.fetcher.size", "10000");
+    streamConfigMap.put("stream.kafka.fetcher.minBytes", "20000");
+    StreamConfig streamConfig = new StreamConfig(tableNameWithType, 
streamConfigMap);
+
+    // test default value
+    KafkaPartitionLevelConsumer kafkaSimpleStreamConsumer = 
createConsumer(clientId, streamConfig, 0);
+    kafkaSimpleStreamConsumer.fetchMessages(new LongMsgOffset(12345L), 10000);
+
+    
assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        
kafkaSimpleStreamConsumer.getKafkaPartitionLevelStreamConfig().getKafkaBufferSize());
+    
assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+        
kafkaSimpleStreamConsumer.getKafkaPartitionLevelStreamConfig().getKafkaSocketTimeout());
+
+    // test parsing values
+    assertEquals(10000, 
kafkaSimpleStreamConsumer.getKafkaPartitionLevelStreamConfig().getKafkaFetcherSizeBytes());
+    assertEquals(20000, 
kafkaSimpleStreamConsumer.getKafkaPartitionLevelStreamConfig().getKafkaFetcherMinBytes());
+
+    // test user defined values
+    streamConfigMap.put("stream.kafka.buffer.size", "100");
+    streamConfigMap.put("stream.kafka.socket.timeout", "1000");
+    streamConfig = new StreamConfig(tableNameWithType, streamConfigMap);
+    kafkaSimpleStreamConsumer = createConsumer(clientId, streamConfig, 0);
+    kafkaSimpleStreamConsumer.fetchMessages(new LongMsgOffset(12345L), 10000);
+    assertEquals(100, 
kafkaSimpleStreamConsumer.getKafkaPartitionLevelStreamConfig().getKafkaBufferSize());
+    assertEquals(1000, 
kafkaSimpleStreamConsumer.getKafkaPartitionLevelStreamConfig().getKafkaSocketTimeout());
+  }
+
+  @Test
+  public void testGetPartitionCount() {
+    String streamType = "kafka";
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
+    String clientId = "clientId";
+    String tableNameWithType = "tableName_REALTIME";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_1);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
getKafkaConsumerFactoryName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    StreamConfig streamConfig = new StreamConfig(tableNameWithType, 
streamConfigMap);
+
+    KafkaStreamMetadataProvider streamMetadataProvider = new 
KafkaStreamMetadataProvider(clientId, streamConfig);
+    assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 1);
+
+    streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_2);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
getKafkaConsumerFactoryName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    streamConfig = new StreamConfig(tableNameWithType, streamConfigMap);
+
+    streamMetadataProvider = new KafkaStreamMetadataProvider(clientId, 
streamConfig);
+    assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2);
+  }
+
+  @Test
+  public void testFetchMessages() {
+    String streamType = "kafka";
+    String streamKafkaTopicName = "theTopic";
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
+    String clientId = "clientId";
+    String tableNameWithType = "tableName_REALTIME";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
getKafkaConsumerFactoryName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    StreamConfig streamConfig = new StreamConfig(tableNameWithType, 
streamConfigMap);
+
+    int partition = 0;
+    KafkaPartitionLevelConsumer kafkaSimpleStreamConsumer = 
createConsumer(clientId, streamConfig, partition);
+    kafkaSimpleStreamConsumer.fetchMessages(new LongMsgOffset(12345L), 10000);
+  }
+
+  @Test
+  public void testFetchOffsets() {
+    testFetchOffsets(TEST_TOPIC_1);
+    testFetchOffsets(TEST_TOPIC_2);
+  }
+
+  private void testFetchOffsets(String topic) {
+    String streamType = "kafka";
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
+    String clientId = "clientId";
+    String tableNameWithType = "tableName_REALTIME";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", topic);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
getKafkaConsumerFactoryName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    StreamConfig streamConfig = new StreamConfig(tableNameWithType, 
streamConfigMap);
+
+    int numPartitions = new KafkaStreamMetadataProvider(clientId, 
streamConfig).fetchPartitionCount(10000);
+    for (int partition = 0; partition < numPartitions; partition++) {
+      KafkaStreamMetadataProvider kafkaStreamMetadataProvider =
+          new KafkaStreamMetadataProvider(clientId, streamConfig, partition);
+      assertEquals(new 
LongMsgOffset(0).compareTo(kafkaStreamMetadataProvider.fetchStreamPartitionOffset(
+          new OffsetCriteria.OffsetCriteriaBuilder().withOffsetSmallest(), 
10000)), 0);
+      assertEquals(new 
LongMsgOffset(0).compareTo(kafkaStreamMetadataProvider.fetchStreamPartitionOffset(
+          new OffsetCriteria.OffsetCriteriaBuilder().withOffsetAsPeriod("2d"), 
10000)), 0);
+      assertEquals(new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION).compareTo(
+          kafkaStreamMetadataProvider.fetchStreamPartitionOffset(
+              new 
OffsetCriteria.OffsetCriteriaBuilder().withOffsetAsTimestamp(Instant.now().toString()),
 10000)), 0);
+      assertEquals(new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION).compareTo(
+          kafkaStreamMetadataProvider.fetchStreamPartitionOffset(
+              new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest(), 
10000)), 0);
+    }
+  }
+
+  @Test
+  public void testConsumer()
+      throws Exception {
+    testConsumer(TEST_TOPIC_1);
+    testConsumer(TEST_TOPIC_2);
+  }
+
+  private void testConsumer(String topic)
+      throws TimeoutException {
+    String streamType = "kafka";
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
+    String clientId = "clientId";
+    String tableNameWithType = "tableName_REALTIME";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", topic);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
getKafkaConsumerFactoryName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    StreamConfig streamConfig = new StreamConfig(tableNameWithType, 
streamConfigMap);
+
+    StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+    int numPartitions = new KafkaStreamMetadataProvider(clientId, 
streamConfig).fetchPartitionCount(10000);
+    for (int partition = 0; partition < numPartitions; partition++) {
+      PartitionGroupConsumer consumer = 
streamConsumerFactory.createPartitionGroupConsumer(clientId,
+          new PartitionGroupConsumptionStatus(partition, 0, new 
LongMsgOffset(0),
+              new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), "CONSUMING"));
+
+      // Test consume a large batch, only 500 records will be returned.
+      MessageBatch messageBatch = consumer.fetchMessages(new LongMsgOffset(0), 
10000);
+      assertEquals(messageBatch.getMessageCount(), 500);
+      assertEquals(messageBatch.getUnfilteredMessageCount(), 500);
+      for (int i = 0; i < 500; i++) {
+        StreamMessage streamMessage = messageBatch.getStreamMessage(i);
+        assertEquals(new String((byte[]) streamMessage.getValue()), 
"sample_msg_" + i);
+        StreamMessageMetadata metadata = streamMessage.getMetadata();
+        assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + i);
+        StreamPartitionMsgOffset offset = metadata.getOffset();
+        assertTrue(offset instanceof LongMsgOffset);
+        assertEquals(((LongMsgOffset) offset).getOffset(), i);
+        StreamPartitionMsgOffset nextOffset = metadata.getNextOffset();
+        assertTrue(nextOffset instanceof LongMsgOffset);
+        assertEquals(((LongMsgOffset) nextOffset).getOffset(), i + 1);
+      }
+      assertEquals(messageBatch.getOffsetOfNextBatch().toString(), "500");
+      assertEquals(messageBatch.getFirstMessageOffset().toString(), "0");
+      
assertEquals(messageBatch.getLastMessageMetadata().getOffset().toString(), 
"499");
+      
assertEquals(messageBatch.getLastMessageMetadata().getNextOffset().toString(), 
"500");
+
+      // Test second half batch
+      messageBatch = consumer.fetchMessages(new LongMsgOffset(500), 10000);
+      assertEquals(messageBatch.getMessageCount(), 500);
+      assertEquals(messageBatch.getUnfilteredMessageCount(), 500);
+      for (int i = 0; i < 500; i++) {
+        StreamMessage streamMessage = messageBatch.getStreamMessage(i);
+        assertEquals(new String((byte[]) streamMessage.getValue()), 
"sample_msg_" + (500 + i));
+        StreamMessageMetadata metadata = streamMessage.getMetadata();
+        assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + 500 + i);
+        StreamPartitionMsgOffset offset = metadata.getOffset();
+        assertTrue(offset instanceof LongMsgOffset);
+        assertEquals(((LongMsgOffset) offset).getOffset(), 500 + i);
+        StreamPartitionMsgOffset nextOffset = metadata.getNextOffset();
+        assertTrue(nextOffset instanceof LongMsgOffset);
+        assertEquals(((LongMsgOffset) nextOffset).getOffset(), 501 + i);
+      }
+      assertEquals(messageBatch.getOffsetOfNextBatch().toString(), "1000");
+      assertEquals(messageBatch.getFirstMessageOffset().toString(), "500");
+      
assertEquals(messageBatch.getLastMessageMetadata().getOffset().toString(), 
"999");
+      
assertEquals(messageBatch.getLastMessageMetadata().getNextOffset().toString(), 
"1000");
+
+      // Some random range
+      messageBatch = consumer.fetchMessages(new LongMsgOffset(10), 10000);
+      assertEquals(messageBatch.getMessageCount(), 500);
+      assertEquals(messageBatch.getUnfilteredMessageCount(), 500);
+      for (int i = 0; i < 500; i++) {
+        StreamMessage streamMessage = messageBatch.getStreamMessage(i);
+        assertEquals(new String((byte[]) streamMessage.getValue()), 
"sample_msg_" + (10 + i));
+        StreamMessageMetadata metadata = streamMessage.getMetadata();
+        assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + 10 + i);
+        StreamPartitionMsgOffset offset = metadata.getOffset();
+        assertTrue(offset instanceof LongMsgOffset);
+        assertEquals(((LongMsgOffset) offset).getOffset(), 10 + i);
+        StreamPartitionMsgOffset nextOffset = metadata.getNextOffset();
+        assertTrue(nextOffset instanceof LongMsgOffset);
+        assertEquals(((LongMsgOffset) nextOffset).getOffset(), 11 + i);
+      }
+      assertEquals(messageBatch.getOffsetOfNextBatch().toString(), "510");
+      assertEquals(messageBatch.getFirstMessageOffset().toString(), "10");
+      
assertEquals(messageBatch.getLastMessageMetadata().getOffset().toString(), 
"509");
+      
assertEquals(messageBatch.getLastMessageMetadata().getNextOffset().toString(), 
"510");
+
+      // Some random range
+      messageBatch = consumer.fetchMessages(new LongMsgOffset(610), 10000);
+      assertEquals(messageBatch.getMessageCount(), 390);
+      assertEquals(messageBatch.getUnfilteredMessageCount(), 390);
+      for (int i = 0; i < 390; i++) {
+        StreamMessage streamMessage = messageBatch.getStreamMessage(i);
+        assertEquals(new String((byte[]) streamMessage.getValue()), 
"sample_msg_" + (610 + i));
+        StreamMessageMetadata metadata = streamMessage.getMetadata();
+        assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + 610 + i);
+        StreamPartitionMsgOffset offset = metadata.getOffset();
+        assertTrue(offset instanceof LongMsgOffset);
+        assertEquals(((LongMsgOffset) offset).getOffset(), 610 + i);
+        StreamPartitionMsgOffset nextOffset = metadata.getNextOffset();
+        assertTrue(nextOffset instanceof LongMsgOffset);
+        assertEquals(((LongMsgOffset) nextOffset).getOffset(), 611 + i);
+      }
+      assertEquals(messageBatch.getOffsetOfNextBatch().toString(), "1000");
+      assertEquals(messageBatch.getFirstMessageOffset().toString(), "610");
+      
assertEquals(messageBatch.getLastMessageMetadata().getOffset().toString(), 
"999");
+      
assertEquals(messageBatch.getLastMessageMetadata().getNextOffset().toString(), 
"1000");
+    }
+  }
+
+  protected String getKafkaConsumerFactoryName() {
+    return KafkaConsumerFactory.class.getName();
+  }
+
+  @Test
+  public void testOffsetsExpired()
+      throws TimeoutException {
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", "kafka");
+    streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_3);
+    streamConfigMap.put("stream.kafka.broker.list", _kafkaBrokerAddress);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
getKafkaConsumerFactoryName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    streamConfigMap.put("auto.offset.reset", "earliest");
+    StreamConfig streamConfig = new StreamConfig("tableName_REALTIME", 
streamConfigMap);
+
+    StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+    PartitionGroupConsumer consumer = 
streamConsumerFactory.createPartitionGroupConsumer("clientId",
+        new PartitionGroupConsumptionStatus(0, 0, new LongMsgOffset(0),
+            new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), "CONSUMING"));
+
+    // Start offset has expired. Automatically reset to earliest available and 
fetch whatever available
+    MessageBatch messageBatch = consumer.fetchMessages(new LongMsgOffset(0), 
10000);
+    assertEquals(messageBatch.getMessageCount(), 500);
+    assertEquals(messageBatch.getUnfilteredMessageCount(), 500);
+    for (int i = 0; i < 500; i++) {
+      assertEquals(new String((byte[]) 
messageBatch.getStreamMessage(i).getValue()), "sample_msg_" + (200 + i));
+    }
+    assertEquals(messageBatch.getOffsetOfNextBatch().toString(), "700");
+  }
+
+  @Test
+  public void testListTopics() {
+    String streamType = "kafka";
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
+    String clientId = "clientId";
+    String tableNameWithType = "tableName_REALTIME";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", "NON_EXISTING_TOPIC");
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
getKafkaConsumerFactoryName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    StreamConfig streamConfig = new StreamConfig(tableNameWithType, 
streamConfigMap);
+
+    KafkaStreamMetadataProvider streamMetadataProvider = new 
KafkaStreamMetadataProvider(clientId, streamConfig);
+    List<StreamMetadataProvider.TopicMetadata> topics = 
streamMetadataProvider.getTopics();
+    List<String> topicNames = topics.stream()
+        .map(StreamMetadataProvider.TopicMetadata::getName)
+        .collect(Collectors.toList());
+    assertTrue(topicNames.containsAll(List.of(TEST_TOPIC_1, TEST_TOPIC_2, 
TEST_TOPIC_3)));
+  }
+
+  @Test
+  void testBatchSizeInBytesIsCalculatedCorrectly() {
+    TopicPartition topicPartition = new TopicPartition("test-topic", 0);
+
+    class FakeKafkaPartitionLevelConsumer extends KafkaPartitionLevelConsumer {
+
+      public FakeKafkaPartitionLevelConsumer(String clientId, StreamConfig 
streamConfig, int partition) {
+        super(clientId, streamConfig, partition);
+      }
+
+      @Override
+      protected Consumer<String, Bytes> createConsumer(Properties 
consumerProp) {
+        Consumer<String, Bytes> mockConsumer = mock(Consumer.class);
+        // Mock records using Kafka 4.x compatible constructor (headers cannot 
be null in Kafka 4.x)
+        ConsumerRecord<String, Bytes> record1 =
+            new ConsumerRecord<>("test-topic", 0, 0L, NO_TIMESTAMP, 
TimestampType.NO_TIMESTAMP_TYPE, 4,
+                5, "key1", new 
Bytes("value1".getBytes(StandardCharsets.UTF_8)), new RecordHeaders(), null);
+        ConsumerRecord<String, Bytes> record2 =
+            new ConsumerRecord<>("test-topic", 0, 0L, NO_TIMESTAMP, 
TimestampType.NO_TIMESTAMP_TYPE, 4,
+                9, "key2", new 
Bytes("value2".getBytes(StandardCharsets.UTF_8)), new RecordHeaders(), null);
+        ConsumerRecord<String, Bytes> record3 =
+            new ConsumerRecord<>("test-topic", 0, 0L, NO_TIMESTAMP, 
TimestampType.NO_TIMESTAMP_TYPE, 4,
+                -1, "key2", new 
Bytes("value2".getBytes(StandardCharsets.UTF_8)), new RecordHeaders(), null);
+        // Mock return of poll()
+        ConsumerRecords<String, Bytes> consumerRecords = new ConsumerRecords<>(
+            Map.of(topicPartition, List.of(record1, record2, record3))
+        );
+        
when(mockConsumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+        return mockConsumer;
+      }
+    }
+
+    FakeKafkaPartitionLevelConsumer kafkaSimpleStreamConsumer =
+        new FakeKafkaPartitionLevelConsumer("clientId-test", 
getStreamConfig("test-topic"), 0);
+    KafkaMessageBatch kafkaMessageBatch = 
kafkaSimpleStreamConsumer.fetchMessages(new LongMsgOffset(12345L), 10000);
+    Assert.assertEquals(kafkaMessageBatch.getSizeInBytes(), 14);
+  }
+
+  @Test
+  public void testFetchLatestStreamOffset()
+      throws IOException {
+    StreamConfig streamConfig = getStreamConfig(TEST_TOPIC_2);
+    try (KafkaStreamMetadataProvider streamMetadataProvider = new 
KafkaStreamMetadataProvider("clientId",
+        streamConfig)) {
+      Set<Integer> partitions = new HashSet<>();
+      partitions.add(0);
+      partitions.add(1);
+      Map<Integer, StreamPartitionMsgOffset> partitionMsgOffsetMap =
+          streamMetadataProvider.fetchLatestStreamOffset(partitions, 1000);
+      Assert.assertEquals(((LongMsgOffset) 
(partitionMsgOffsetMap.get(0))).getOffset(), NUM_MSG_PRODUCED_PER_PARTITION);
+      Assert.assertEquals(((LongMsgOffset) 
(partitionMsgOffsetMap.get(1))).getOffset(), NUM_MSG_PRODUCED_PER_PARTITION);
+    }
+  }
+
+  private KafkaPartitionLevelConsumer createConsumer(String clientId, 
StreamConfig streamConfig, int partition) {
+    if (RANDOM.nextDouble() < 0.5) {
+      return new KafkaPartitionLevelConsumer(clientId, streamConfig, 
partition);
+    } else {
+      return new KafkaPartitionLevelConsumer(clientId, streamConfig, partition,
+          new ExponentialBackoffRetryPolicy(2, 1000, 1.1));
+    }
+  }
+
+  private StreamConfig getStreamConfig(String topicName) {
+    String streamType = "kafka";
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
+    String tableNameWithType = "tableName_REALTIME";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", topicName);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
getKafkaConsumerFactoryName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+
+    return new StreamConfig(tableNameWithType, streamConfigMap);
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelStreamConfigTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelStreamConfigTest.java
new file mode 100644
index 00000000000..586b8e0bc91
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelStreamConfigTest.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.plugin.stream.kafka.KafkaPartitionLevelStreamConfig;
+import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class KafkaPartitionLevelStreamConfigTest {
+  private static final String KAFKA_DECODER_CLASS_NAME =
+      "org.apache.pinot.plugin.inputformat.avro.KafkaAvroMessageDecoder";
+
+  private KafkaPartitionLevelStreamConfig getStreamConfig(String topic, String 
bootstrapHosts, String buffer,
+      String socketTimeout) {
+    return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, null, 
null, null);
+  }
+
+  private KafkaPartitionLevelStreamConfig getStreamConfig(String topic, String 
bootstrapHosts, String buffer,
+      String socketTimeout, String isolationLevel) {
+    return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, null, 
null, isolationLevel);
+  }
+
+  private KafkaPartitionLevelStreamConfig getStreamConfig(String topic, String 
bootstrapHosts, String buffer,
+      String socketTimeout, String fetcherSize, String fetcherMinBytes, String 
isolationLevel) {
+    return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, 
fetcherSize, fetcherMinBytes, isolationLevel,
+        null);
+  }
+
+  private KafkaPartitionLevelStreamConfig getStreamConfig(String topic, String 
bootstrapHosts, String buffer,
+      String socketTimeout, String fetcherSize, String fetcherMinBytes, String 
isolationLevel,
+      String populateRowMetadata) {
+    return new KafkaPartitionLevelStreamConfig(
+        getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, 
fetcherSize, fetcherMinBytes, isolationLevel,
+            populateRowMetadata, "tableName_REALTIME"));
+  }
+
+  private StreamConfig getStreamConfig(String topic, String bootstrapHosts, 
String buffer, String socketTimeout,
+      String fetcherSize, String fetcherMinBytes, String isolationLevel, 
String populateRowMetadata,
+      String tableNameWithType) {
+    Map<String, String> streamConfigMap = new HashMap<>();
+    String streamType = "kafka";
+    String consumerFactoryClassName = KafkaConsumerFactory.class.getName();
+    streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+    
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), 
consumerFactoryClassName);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_DECODER_CLASS),
+        KAFKA_DECODER_CLASS_NAME);
+    streamConfigMap.put("stream.kafka.broker.list", bootstrapHosts);
+    if (buffer != null) {
+      streamConfigMap.put("stream.kafka.buffer.size", buffer);
+    }
+    if (socketTimeout != null) {
+      streamConfigMap.put("stream.kafka.socket.timeout", socketTimeout);
+    }
+    if (fetcherSize != null) {
+      streamConfigMap.put("stream.kafka.fetcher.size", fetcherSize);
+    }
+    if (fetcherMinBytes != null) {
+      streamConfigMap.put("stream.kafka.fetcher.minBytes", fetcherMinBytes);
+    }
+    if (isolationLevel != null) {
+      streamConfigMap.put("stream.kafka.isolation.level", isolationLevel);
+    }
+    if (populateRowMetadata != null) {
+      streamConfigMap.put("stream.kafka.metadata.populate", 
populateRowMetadata);
+    }
+    return new StreamConfig(tableNameWithType, streamConfigMap);
+  }
+
+  @Test
+  public void testGetKafkaIsolationLevel() {
+    KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "", "", 
"", "read_committed");
+    Assert.assertEquals("read_committed", config.getKafkaIsolationLevel());
+  }
+
+  @Test
+  public void testGetKafkaTopicName() {
+    KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "", "", 
"");
+    Assert.assertEquals("topic", config.getKafkaTopicName());
+  }
+
+  @Test
+  public void testGetBootstrapHosts() {
+    KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1", 
"", "");
+    Assert.assertEquals("host1", config.getBootstrapHosts());
+  }
+
+  @Test
+  public void testGetKafkaBufferSize() {
+    // test default
+    KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1", 
null, "");
+    
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        config.getKafkaBufferSize());
+
+    config = getStreamConfig("topic", "host1", "", "");
+    
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        config.getKafkaBufferSize());
+
+    config = getStreamConfig("topic", "host1", "bad value", "");
+    
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        config.getKafkaBufferSize());
+
+    // correct config
+    config = getStreamConfig("topic", "host1", "100", "");
+    Assert.assertEquals(100, config.getKafkaBufferSize());
+  }
+
+  @Test
+  public void testGetKafkaSocketTimeout() {
+    // test default
+    KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1", 
"", null);
+    
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+        config.getKafkaSocketTimeout());
+
+    config = getStreamConfig("topic", "host1", "", "");
+    
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+        config.getKafkaSocketTimeout());
+
+    config = getStreamConfig("topic", "host1", "", "bad value");
+    
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+        config.getKafkaSocketTimeout());
+
+    // correct config
+    config = getStreamConfig("topic", "host1", "", "100");
+    Assert.assertEquals(100, config.getKafkaSocketTimeout());
+  }
+
+  @Test
+  public void testGetFetcherSize() {
+    // test default
+    KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1", 
"", "", "", null, null);
+    
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        config.getKafkaFetcherSizeBytes());
+
+    config = getStreamConfig("topic", "host1", "100", "", "", null, null);
+    Assert.assertEquals(100, config.getKafkaFetcherSizeBytes());
+
+    config = getStreamConfig("topic", "host1", "100", "", "bad value", null, 
null);
+    Assert.assertEquals(100, config.getKafkaFetcherSizeBytes());
+
+    // correct config
+    config = getStreamConfig("topic", "host1", "100", "", "200", null, null);
+    Assert.assertEquals(200, config.getKafkaFetcherSizeBytes());
+  }
+
+  @Test
+  public void testGetFetcherMinBytes() {
+    // test default
+    KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1", 
"", "", "", null, null);
+    
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+        config.getKafkaFetcherMinBytes());
+
+    config = getStreamConfig("topic", "host1", "", "", "", "", null);
+    
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+        config.getKafkaFetcherMinBytes());
+
+    config = getStreamConfig("topic", "host1", "", "", "", "bad value", null);
+    
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+        config.getKafkaFetcherMinBytes());
+
+    // correct config
+    config = getStreamConfig("topic", "host1", "", "", "", "100", null);
+    Assert.assertEquals(100, config.getKafkaFetcherMinBytes());
+  }
+
+  @Test
+  public void testIsPopulateRowMetadata() {
+    // test default
+    KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1", 
null, null, null, null, null, null);
+    Assert.assertFalse(config.isPopulateMetadata());
+
+    config = getStreamConfig("topic", "host1", null, null, null, null, null, 
"bad value");
+    Assert.assertFalse(config.isPopulateMetadata());
+
+    config = getStreamConfig("topic", "host1", null, null, null, null, null, 
"TrUe");
+    Assert.assertTrue(config.isPopulateMetadata());
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/utils/MiniKafkaCluster.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/utils/MiniKafkaCluster.java
new file mode 100644
index 00000000000..03535d2ee81
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/utils/MiniKafkaCluster.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka40.utils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.testcontainers.kafka.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+
+/**
+ * MiniKafkaCluster for Kafka 4.x using Testcontainers with KRaft mode (no 
ZooKeeper).
+ * Uses the apache/kafka-native image for fast startup.
+ */
+public final class MiniKafkaCluster implements Closeable {
+
+  private static final String KAFKA_IMAGE = "apache/kafka:4.0.0";
+
+  private final KafkaContainer _kafkaContainer;
+
+  public MiniKafkaCluster(String brokerId)
+      throws Exception {
+    // Use apache/kafka image which supports KRaft mode
+    _kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
+  }
+
+  public void start()
+      throws Exception {
+    _kafkaContainer.start();
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _kafkaContainer.stop();
+  }
+
+  public String getKafkaServerAddress() {
+    return _kafkaContainer.getBootstrapServers();
+  }
+
+  private AdminClient getOrCreateAdminClient() {
+    Properties kafkaClientConfig = new Properties();
+    kafkaClientConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
getKafkaServerAddress());
+    return AdminClient.create(kafkaClientConfig);
+  }
+
+  public void createTopic(String topicName, int numPartitions, int 
replicationFactor)
+      throws ExecutionException, InterruptedException {
+    try (AdminClient adminClient = getOrCreateAdminClient()) {
+      NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 
replicationFactor);
+      int retries = 5;
+      while (retries > 0) {
+        try {
+          
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
+          return;
+        } catch (ExecutionException e) {
+          if (e.getCause() instanceof 
org.apache.kafka.common.errors.TimeoutException) {
+            retries--;
+            TimeUnit.SECONDS.sleep(1);
+          } else {
+            throw e;
+          }
+        }
+      }
+      throw new ExecutionException("Failed to create topic after retries", 
null);
+    }
+  }
+
+  public void deleteTopic(String topicName)
+      throws ExecutionException, InterruptedException {
+    try (AdminClient adminClient = getOrCreateAdminClient()) {
+      
adminClient.deleteTopics(Collections.singletonList(topicName)).all().get();
+    }
+  }
+
+  public void deleteRecordsBeforeOffset(String topicName, int partitionId, 
long offset)
+      throws ExecutionException, InterruptedException {
+    try (AdminClient adminClient = getOrCreateAdminClient()) {
+      Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
+      recordsToDelete.put(new TopicPartition(topicName, partitionId), 
RecordsToDelete.beforeOffset(offset));
+      // Wait for the deletion to complete
+      adminClient.deleteRecords(recordsToDelete).all().get();
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/resources/log4j2.xml
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/resources/log4j2.xml
new file mode 100644
index 00000000000..439331f9d7b
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/resources/log4j2.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<Configuration>
+  <Appenders>
+    <Console name="console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} %p [%c{1}] [%t] %m%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Logger name="org.apache.pinot" level="warn" additivity="false">
+      <AppenderRef ref="console"/>
+    </Logger>
+    <Root level="error">
+      <AppenderRef ref="console"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/pinot-plugins/pinot-stream-ingestion/pom.xml 
b/pinot-plugins/pinot-stream-ingestion/pom.xml
index 58a62ebb789..87e9760d4c2 100644
--- a/pinot-plugins/pinot-stream-ingestion/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pom.xml
@@ -39,6 +39,7 @@
     <module>pinot-kafka-base</module>
     <module>pinot-kafka-2.0</module>
     <module>pinot-kafka-3.0</module>
+    <module>pinot-kafka-4.0</module>
     <module>pinot-kinesis</module>
     <module>pinot-pulsar</module>
   </modules>
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java
index 595c0638e08..37abe9b03f3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java
@@ -95,6 +95,7 @@ public class PluginManager {
       put("org.apache.pinot.filesystem.LocalPinotFS", 
"org.apache.pinot.spi.filesystem.LocalPinotFS");
 
       // StreamConsumerFactory
+      // Old-style class names mapped to current plugin packages
       put("org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
           "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory");
       put("org.apache.pinot.core.realtime.impl.kafka3.KafkaConsumerFactory",
diff --git a/pom.xml b/pom.xml
index 5833c56d11a..8c5c8d81da6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -197,6 +197,7 @@
     <spark3.version>3.5.8</spark3.version>
     <kafka2.version>2.8.2</kafka2.version>
     <kafka3.version>3.9.1</kafka3.version>
+    <kafka4.version>4.1.1</kafka4.version>
     <confluent.version>7.9.5</confluent.version>
     <pulsar.version>4.0.8</pulsar.version>
     <flink.version>1.20.3</flink.version>
@@ -684,6 +685,11 @@
         <artifactId>pinot-kafka-3.0</artifactId>
         <version>${project.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.pinot</groupId>
+        <artifactId>pinot-kafka-4.0</artifactId>
+        <version>${project.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.pinot</groupId>
         <artifactId>pinot-kinesis</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to