sjvanrossum commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1212769417
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1578,15 +1604,30 @@ static class GenerateKafkaSourceDescriptor extends
DoFn<byte[], KafkaSourceDescr
@VisibleForTesting final @Nullable List<String> topics;
+ private final @Nullable Pattern topicPattern;
+
@ProcessElement
public void processElement(OutputReceiver<KafkaSourceDescriptor>
receiver) {
List<TopicPartition> partitions =
new ArrayList<>(Preconditions.checkStateNotNull(topicPartitions));
if (partitions.isEmpty()) {
try (Consumer<?, ?> consumer =
consumerFactoryFn.apply(consumerConfig)) {
- for (String topic : Preconditions.checkStateNotNull(topics)) {
- for (PartitionInfo p : consumer.partitionsFor(topic)) {
- partitions.add(new TopicPartition(p.topic(), p.partition()));
+ List<String> topics = Preconditions.checkStateNotNull(this.topics);
+ if (topics.isEmpty()) {
+ Pattern pattern = Preconditions.checkStateNotNull(topicPattern);
+ for (Map.Entry<String, List<PartitionInfo>> entry :
+ consumer.listTopics().entrySet()) {
+ if (pattern.matcher(entry.getKey()).matches()) {
+ for (PartitionInfo p : entry.getValue()) {
+ partitions.add(new TopicPartition(p.topic(),
p.partition()));
+ }
+ }
+ }
+ } else {
+ for (String topic : topics) {
+ for (PartitionInfo p : consumer.partitionsFor(topic)) {
Review Comment:
Is this referring to `topics`? Both `topicPartitions` and `topics` are
initialized as empty lists in the builder by default and replaced using
`.withTopics()` and `.withTopicPartitions()`. The previous
`Preconditions.checkStateNotNull(topics)` expression in the for loop should
still not be null under any circumstance. Special care should be taken to carry
that property forward when we add support for this property in KafkaIO's
ExternalTransformRegistrar though, since it doesn't guarantee the same object
state the builder guarantees.
In regards to `topicPattern`, if both `topicPartitions` and `topics` are
empty, then `topicPattern` must be non-null, since the PTransform's expansion
checks that at least one of those properties is set and the `.withX()` builder
methods check that none are previously set.
As far as Kafka's topic metadata goes, `.partitionsFor()` will throw an
exception if an unauthorized topic is requested and `.listTopics()` will only
list all authorized topics. Both methods return initialized objects and ensure
that potential null responses from the server are translated to empty
collections (as far back as `org.apache.kafka:kafka-clients:0.11.0.3`) or throw
an exception in the case of an authorization failure. I'd say that the existing
check on `partitionInfoList` seems superfluous and could potentially be
considered for deletion:
```
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
checkState(
partitionInfoList != null,
"Could not find any partitions info. Please check Kafka configuration
and make sure "
+ "that provided topics exist.");
for (PartitionInfo p : partitionInfoList) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]