sjvanrossum commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1212769417
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1578,15 +1604,30 @@ static class GenerateKafkaSourceDescriptor extends
DoFn<byte[], KafkaSourceDescr
@VisibleForTesting final @Nullable List<String> topics;
+ private final @Nullable Pattern topicPattern;
+
@ProcessElement
public void processElement(OutputReceiver<KafkaSourceDescriptor>
receiver) {
List<TopicPartition> partitions =
new ArrayList<>(Preconditions.checkStateNotNull(topicPartitions));
if (partitions.isEmpty()) {
try (Consumer<?, ?> consumer =
consumerFactoryFn.apply(consumerConfig)) {
- for (String topic : Preconditions.checkStateNotNull(topics)) {
- for (PartitionInfo p : consumer.partitionsFor(topic)) {
- partitions.add(new TopicPartition(p.topic(), p.partition()));
+ List<String> topics = Preconditions.checkStateNotNull(this.topics);
+ if (topics.isEmpty()) {
+ Pattern pattern = Preconditions.checkStateNotNull(topicPattern);
+ for (Map.Entry<String, List<PartitionInfo>> entry :
+ consumer.listTopics().entrySet()) {
+ if (pattern.matcher(entry.getKey()).matches()) {
+ for (PartitionInfo p : entry.getValue()) {
+ partitions.add(new TopicPartition(p.topic(),
p.partition()));
+ }
+ }
+ }
+ } else {
+ for (String topic : topics) {
+ for (PartitionInfo p : consumer.partitionsFor(topic)) {
Review Comment:
Is this referring to `topics`? Both `topicPartitions` and `topics` are
initialized as empty lists in the builder by default and replaced using
`.withTopics()` and `.withTopicPartitions()`. The previous
`Preconditions.checkStateNotNull(topics)` expression in the for loop should
still not be null under any circumstance. Special care should be taken to carry
that property forward when we add support for this property in KafkaIO's
ExternalTransformRegistrar though, since it doesn't guarantee the same object
state the builder guarantees.
In regards to `topicPattern`, if both `topicPartitions` and `topics` are
empty, then `topicPattern` must be non-null, since the PTransform's expansion
checks that at least one of those properties is set and the `.withX()` builder
methods check that none are previously set.
As far as Kafka's topic metadata goes, `.partitionsFor()` will throw an
exception if an unauthorized topic is requested and `.listTopics()` will only
list all authorized topics. Both methods return initialized objects or throw an
exception. I'd say that the existing check on `partitionInfoList` seems
superfluous and could potentially be considered for deletion:
```
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
checkState(
partitionInfoList != null,
"Could not find any partitions info. Please check Kafka configuration
and make sure "
+ "that provided topics exist.");
for (PartitionInfo p : partitionInfoList) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]