sjvanrossum commented on code in PR #33031:
URL: https://github.com/apache/beam/pull/33031#discussion_r1838541133


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -520,12 +522,14 @@ String name() {
     List<TopicPartition> partitions =
         
Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions());
 
-    // Each source has a single unique topic.
     for (TopicPartition topicPartition : partitions) {
-      this.kafkaTopic = topicPartition.topic();
-      break;
+      this.kafkaTopics.add(topicPartition.topic());

Review Comment:
   Instead of initializing `kafkaTopics` to an empty set and then appending to 
it, would it make sense to declare it and move the assignment to here?
   ```
   this.kafkaTopics = 
partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
   ```



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -520,12 +522,14 @@ String name() {
     List<TopicPartition> partitions =
         
Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions());
 
-    // Each source has a single unique topic.
     for (TopicPartition topicPartition : partitions) {
-      this.kafkaTopic = topicPartition.topic();
-      break;
+      this.kafkaTopics.add(topicPartition.topic());
     }
 
+    LOG.info(
+        "Reader {} is reading from topics {}",
+        this.name,
+        kafkaTopics.stream().collect(Collectors.joining(", ", "{", "}")));

Review Comment:
   The default `toString` implementation for a `Collection` would return `[e1, 
e2, e3]`, but this is fine if you want exactly the specified formatting. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to