This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cdd16fef4d Prevent NPE when attempt to fetch partition information 
fails (#11769)
cdd16fef4d is described below

commit cdd16fef4d6bff6a7caec3d997c0a45190bd0ff3
Author: Ragesh Rajagopalan <[email protected]>
AuthorDate: Tue Oct 10 17:33:40 2023 -0700

    Prevent NPE when attempt to fetch partition information fails (#11769)
---
 .../pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 022b38273e..3e8928da12 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -24,8 +24,11 @@ import java.time.Clock;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
 import org.apache.pinot.spi.stream.LongMsgOffset;
@@ -57,7 +60,11 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
   @Override
   public int fetchPartitionCount(long timeoutMillis) {
     try {
-      return _consumer.partitionsFor(_topic, 
Duration.ofMillis(timeoutMillis)).size();
+      List<PartitionInfo> partitionInfos = _consumer.partitionsFor(_topic, 
Duration.ofMillis(timeoutMillis));
+      if (CollectionUtils.isNotEmpty(partitionInfos)) {
+        return partitionInfos.size();
+      }
+      throw new RuntimeException(String.format("Failed to fetch partition 
information for topic: %s", _topic));
     } catch (TimeoutException e) {
       throw new TransientConsumerException(e);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to