aromanenko-dev commented on a change in pull request #14217:
URL: https://github.com/apache/beam/pull/14217#discussion_r593368217



##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
##########
@@ -66,7 +66,12 @@
     if (partitions.isEmpty()) {
       try (Consumer<?, ?> consumer = 
spec.getConsumerFactoryFn().apply(spec.getConsumerConfig())) {
         for (String topic : spec.getTopics()) {
-          for (PartitionInfo p : consumer.partitionsFor(topic)) {
+          List<PartitionInfo> partitionInfoList = 
consumer.partitionsFor(topic);

Review comment:
       Well, I checked it against local Kafka cluster with 
`auto.create.topics.enable=false`. When I try to read from non-existing topic 
then `consumer.partitionsFor(topic)` will return null in this case and we have 
NPE.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to