gemini-code-assist[bot] commented on code in PR #35758: URL: https://github.com/apache/beam/pull/35758#discussion_r2246325586
########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java: ########## @@ -1648,6 +1650,24 @@ private void warnAboutUnsafeConfigurations(PBegin input) { } } + private Read<K, V> deduplicateTopics(Read<K, V> kafkaRead) { + if (getTopics() != null && getTopics().size() > 0) { + List<String> topics = getTopics(); + return kafkaRead + .toBuilder() + .setTopics(topics.stream().distinct().collect(Collectors.toList())) + .build(); + } + if (getTopicPartitions() != null && getTopicPartitions().size() > 0) { + List<TopicPartition> topicPartitions = getTopicPartitions(); + return kafkaRead + .toBuilder() + .setTopicPartitions(topicPartitions.stream().distinct().collect(Collectors.toList())) + .build(); + } + return kafkaRead; + } Review Comment:  This method can be made more efficient. Currently, it may call `getTopics()` and `getTopicPartitions()` multiple times and creates a new `Read` object even if the input list contains no duplicates. You can improve this by: 1. Storing the result of `getTopics()` and `getTopicPartitions()` in a local variable to avoid redundant calls. 2. Only creating a new `Read` object if duplicates were actually found and removed. Here's a suggested implementation: ```java private Read<K, V> deduplicateTopics(Read<K, V> kafkaRead) { final List<String> topics = getTopics(); if (topics != null && !topics.isEmpty()) { final List<String> distinctTopics = topics.stream().distinct().collect(Collectors.toList()); if (topics.size() == distinctTopics.size()) { return kafkaRead; } return kafkaRead.toBuilder().setTopics(distinctTopics).build(); } final List<TopicPartition> topicPartitions = getTopicPartitions(); if (topicPartitions != null && !topicPartitions.isEmpty()) { final List<TopicPartition> distinctTopicPartitions = topicPartitions.stream().distinct().collect(Collectors.toList()); if (topicPartitions.size() == distinctTopicPartitions.size()) { return kafkaRead; } return kafkaRead.toBuilder().setTopicPartitions(distinctTopicPartitions).build(); } return kafkaRead; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org