sjvanrossum commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1212769417


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1578,15 +1604,30 @@ static class GenerateKafkaSourceDescriptor extends 
DoFn<byte[], KafkaSourceDescr
 
       @VisibleForTesting final @Nullable List<String> topics;
 
+      private final @Nullable Pattern topicPattern;
+
       @ProcessElement
       public void processElement(OutputReceiver<KafkaSourceDescriptor> 
receiver) {
         List<TopicPartition> partitions =
             new ArrayList<>(Preconditions.checkStateNotNull(topicPartitions));
         if (partitions.isEmpty()) {
           try (Consumer<?, ?> consumer = 
consumerFactoryFn.apply(consumerConfig)) {
-            for (String topic : Preconditions.checkStateNotNull(topics)) {
-              for (PartitionInfo p : consumer.partitionsFor(topic)) {
-                partitions.add(new TopicPartition(p.topic(), p.partition()));
+            List<String> topics = Preconditions.checkStateNotNull(this.topics);
+            if (topics.isEmpty()) {
+              Pattern pattern = Preconditions.checkStateNotNull(topicPattern);
+              for (Map.Entry<String, List<PartitionInfo>> entry :
+                  consumer.listTopics().entrySet()) {
+                if (pattern.matcher(entry.getKey()).matches()) {
+                  for (PartitionInfo p : entry.getValue()) {
+                    partitions.add(new TopicPartition(p.topic(), 
p.partition()));
+                  }
+                }
+              }
+            } else {
+              for (String topic : topics) {
+                for (PartitionInfo p : consumer.partitionsFor(topic)) {

Review Comment:
   Is this referring to `topics`? Both `topicPartitions` and `topics` are 
initialized as empty lists in the builder by default and replaced using 
`.withTopics()` and `.withTopicPartitions()`. The previous 
`Preconditions.checkStateNotNull(topics)` expression in the for loop should 
still not be null under any circumstance. Special care should be taken to carry 
that property forward when we add support for this property in KafkaIO's 
ExternalTransformRegistrar though, since it doesn't guarantee the same object 
state the builder guarantees.
   In regards to `topicPattern`, if both `topicPartitions` and `topics` are 
empty, then `topicPattern` must be non-null, since the PTransform's expansion 
checks that at least one of those properties is set and the `.withX()` builder 
methods check that none are previously set.
   
   As far as Kafka's topic metadata goes, `.partitionsFor()` will throw an 
exception if an unauthorized topic is requested and `.listTopics()` will only 
list all authorized topics. Both methods return initialized objects and ensure 
that potential null responses from the server are translated to empty 
collections (as far back as `org.apache.kafka:kafka-clients:0.11.0.3`) or throw 
an exception in the case of an authorization failure. I'd say that the existing 
check on `partitionInfoList` seems superfluous and could potentially be 
considered for deletion:
   ```
   List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
   checkState(
       partitionInfoList != null,
       "Could not find any partitions info. Please check Kafka configuration 
and make sure "
           + "that provided topics exist.");
   for (PartitionInfo p : partitionInfoList) {
     partitions.add(new TopicPartition(p.topic(), p.partition()));
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to