This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2f76e37196 Reuse Kafka admin client for better performance (#16129)
2f76e37196 is described below
commit 2f76e371968152fd428e08b2e66b196328920786
Author: Kartik Khare <[email protected]>
AuthorDate: Fri Jun 20 10:25:39 2025 +0530
Reuse Kafka admin client for better performance (#16129)
Co-authored-by: KKCorps <[email protected]>
---
.../plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java | 3 +++
.../pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java | 6 +++---
.../plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java | 2 ++
.../pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java | 6 +++---
4 files changed, 11 insertions(+), 6 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
index ea0a5093e8..fb7052f6ac 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
@@ -56,6 +56,7 @@ public abstract class KafkaPartitionLevelConnectionHandler {
protected final Consumer<String, Bytes> _consumer;
protected final TopicPartition _topicPartition;
protected final Properties _consumerProp;
+ protected final AdminClient _adminClient;
public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig
streamConfig, int partition) {
_config = new KafkaPartitionLevelStreamConfig(streamConfig);
@@ -67,6 +68,7 @@ public abstract class KafkaPartitionLevelConnectionHandler {
_consumer = createConsumer(_consumerProp);
_topicPartition = new TopicPartition(_topic, _partition);
_consumer.assign(Collections.singletonList(_topicPartition));
+ _adminClient = createAdminClient();
}
private Properties buildProperties(StreamConfig streamConfig) {
@@ -116,6 +118,7 @@ public abstract class KafkaPartitionLevelConnectionHandler {
public void close()
throws IOException {
_consumer.close();
+ _adminClient.close();
}
@VisibleForTesting
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index a584a0ee59..0d963ea65b 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -100,7 +100,7 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
Preconditions.checkNotNull(offsetCriteria);
- try (AdminClient adminClient = createAdminClient()) {
+ try {
// Build the offset spec request for this partition
Map<TopicPartition, OffsetSpec> request = new HashMap<>();
if (offsetCriteria.isLargest()) {
@@ -117,13 +117,13 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
throw new IllegalArgumentException("Unknown offset criteria: " +
offsetCriteria);
}
// Query via AdminClient (thread-safe)
- ListOffsetsResult result = adminClient.listOffsets(request);
+ ListOffsetsResult result = _adminClient.listOffsets(request);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
if (!isValidOffsetInfo(offsets) && (offsetCriteria.isTimestamp() ||
offsetCriteria.isPeriod())) {
// fetch endOffsets as fallback
request.put(_topicPartition, OffsetSpec.latest());
- result = adminClient.listOffsets(request);
+ result = _adminClient.listOffsets(request);
offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
LOGGER.warn(
"initial offset type is {} and its value evaluates to null hence
proceeding with offset {} " + "for "
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
index 92ee657a5a..81690b5380 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
@@ -55,6 +55,7 @@ public abstract class KafkaPartitionLevelConnectionHandler {
protected final Consumer<String, Bytes> _consumer;
protected final TopicPartition _topicPartition;
protected final Properties _consumerProp;
+ protected final AdminClient _adminClient;
public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig
streamConfig, int partition) {
_config = new KafkaPartitionLevelStreamConfig(streamConfig);
@@ -66,6 +67,7 @@ public abstract class KafkaPartitionLevelConnectionHandler {
_consumer = createConsumer(_consumerProp);
_topicPartition = new TopicPartition(_topic, _partition);
_consumer.assign(Collections.singletonList(_topicPartition));
+ _adminClient = createAdminClient();
}
private Properties buildProperties(StreamConfig streamConfig) {
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
index 65c803804b..c3e9aadbbc 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
@@ -100,7 +100,7 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
Preconditions.checkNotNull(offsetCriteria);
- try (AdminClient adminClient = createAdminClient()) {
+ try {
// Build the offset spec request for this partition
Map<TopicPartition, OffsetSpec> request = new HashMap<>();
if (offsetCriteria.isLargest()) {
@@ -117,13 +117,13 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
throw new IllegalArgumentException("Unknown offset criteria: " +
offsetCriteria);
}
// Query via AdminClient (thread-safe)
- ListOffsetsResult result = adminClient.listOffsets(request);
+ ListOffsetsResult result = _adminClient.listOffsets(request);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
if (!isValidOffsetInfo(offsets) && (offsetCriteria.isTimestamp() ||
offsetCriteria.isPeriod())) {
// fetch endOffsets as fallback
request.put(_topicPartition, OffsetSpec.latest());
- result = adminClient.listOffsets(request);
+ result = _adminClient.listOffsets(request);
offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
LOGGER.warn(
"initial offset type is {} and its value evaluates to null hence
proceeding with offset {} " + "for "
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]