bvolpato commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1212436172
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1578,15 +1604,30 @@ static class GenerateKafkaSourceDescriptor extends
DoFn<byte[], KafkaSourceDescr
@VisibleForTesting final @Nullable List<String> topics;
+ private final @Nullable Pattern topicPattern;
+
@ProcessElement
public void processElement(OutputReceiver<KafkaSourceDescriptor>
receiver) {
List<TopicPartition> partitions =
new ArrayList<>(Preconditions.checkStateNotNull(topicPartitions));
if (partitions.isEmpty()) {
try (Consumer<?, ?> consumer =
consumerFactoryFn.apply(consumerConfig)) {
- for (String topic : Preconditions.checkStateNotNull(topics)) {
- for (PartitionInfo p : consumer.partitionsFor(topic)) {
- partitions.add(new TopicPartition(p.topic(), p.partition()));
+ List<String> topics = Preconditions.checkStateNotNull(this.topics);
+ if (topics.isEmpty()) {
+ Pattern pattern = Preconditions.checkStateNotNull(topicPattern);
+ for (Map.Entry<String, List<PartitionInfo>> entry :
+ consumer.listTopics().entrySet()) {
+ if (pattern.matcher(entry.getKey()).matches()) {
+ for (PartitionInfo p : entry.getValue()) {
+ partitions.add(new TopicPartition(p.topic(),
p.partition()));
+ }
+ }
+ }
+ } else {
+ for (String topic : topics) {
+ for (PartitionInfo p : consumer.partitionsFor(topic)) {
Review Comment:
Any chance this is null at this point? In the split below you have null
checks, but not here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]