dan-s1 commented on code in PR #9807:
URL: https://github.com/apache/nifi/pull/9807#discussion_r1999255697
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java:
##########
@@ -28,39 +29,58 @@
import org.apache.nifi.kafka.service.api.consumer.PollingSummary;
import org.apache.nifi.kafka.service.api.header.RecordHeader;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
-import org.apache.nifi.kafka.service.consumer.pool.Subscription;
import org.apache.nifi.logging.ComponentLog;
import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Kafka 3 Consumer Service implementation with Object Pooling for subscribed
Kafka Consumers
*/
-public class Kafka3ConsumerService implements KafkaConsumerService, Closeable {
+public class Kafka3ConsumerService implements KafkaConsumerService, Closeable,
ConsumerRebalanceListener {
private final ComponentLog componentLog;
private final Consumer<byte[], byte[]> consumer;
private final Subscription subscription;
- private final Duration maxUncommittedTime;
private volatile boolean closed = false;
- public Kafka3ConsumerService(final ComponentLog componentLog, final
Consumer<byte[], byte[]> consumer, final Subscription subscription,
- final Duration maxUncommittedTime) {
-
+ public Kafka3ConsumerService(final ComponentLog componentLog, final
Consumer<byte[], byte[]> consumer, final Subscription subscription) {
this.componentLog = Objects.requireNonNull(componentLog, "Component
Log required");
this.consumer = consumer;
this.subscription = subscription;
- this.maxUncommittedTime = maxUncommittedTime;
+
+ final Optional<Pattern> topicPatternFound =
subscription.getTopicPattern();
+ if (topicPatternFound.isPresent()) {
+ final Pattern topicPattern = topicPatternFound.get();
+ consumer.subscribe(topicPattern, this);
+ } else {
+ final Collection<String> topics = subscription.getTopics();
+ consumer.subscribe(topics, this);
+ }
+ }
+
+ @Override
+ public void onPartitionsAssigned(final Collection<TopicPartition>
partitions) {
+ componentLog.info("Kafka assigned the following Partitions to this
consumer: {}", partitions);
+ }
+
+ @Override
+ public void onPartitionsRevoked(final Collection<TopicPartition>
partitions) {
+ componentLog.info("Kafka revoked the following Partitions from this
consumer: {}", partitions);
+ rollback(new HashSet<>(partitions));
Review Comment:
Couldn't `Set.of` be sufficient?
```suggestion
rollback(Set.of(partitions));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]