This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cdd16fef4d Prevent NPE when attempt to fetch partition information
fails (#11769)
cdd16fef4d is described below
commit cdd16fef4d6bff6a7caec3d997c0a45190bd0ff3
Author: Ragesh Rajagopalan <[email protected]>
AuthorDate: Tue Oct 10 17:33:40 2023 -0700
Prevent NPE when attempt to fetch partition information fails (#11769)
---
.../pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 022b38273e..3e8928da12 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -24,8 +24,11 @@ import java.time.Clock;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.LongMsgOffset;
@@ -57,7 +60,11 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
@Override
public int fetchPartitionCount(long timeoutMillis) {
try {
- return _consumer.partitionsFor(_topic,
Duration.ofMillis(timeoutMillis)).size();
+ List<PartitionInfo> partitionInfos = _consumer.partitionsFor(_topic,
Duration.ofMillis(timeoutMillis));
+ if (CollectionUtils.isNotEmpty(partitionInfos)) {
+ return partitionInfos.size();
+ }
+ throw new RuntimeException(String.format("Failed to fetch partition
information for topic: %s", _topic));
} catch (TimeoutException e) {
throw new TransientConsumerException(e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]