pvillard31 commented on code in PR #9807:
URL: https://github.com/apache/nifi/pull/9807#discussion_r1999449027


##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java:
##########
@@ -28,39 +29,58 @@
 import org.apache.nifi.kafka.service.api.consumer.PollingSummary;
 import org.apache.nifi.kafka.service.api.header.RecordHeader;
 import org.apache.nifi.kafka.service.api.record.ByteRecord;
-import org.apache.nifi.kafka.service.consumer.pool.Subscription;
 import org.apache.nifi.logging.ComponentLog;
 
 import java.io.Closeable;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 /**
  * Kafka 3 Consumer Service implementation with Object Pooling for subscribed 
Kafka Consumers
  */
-public class Kafka3ConsumerService implements KafkaConsumerService, Closeable {
+public class Kafka3ConsumerService implements KafkaConsumerService, Closeable, 
ConsumerRebalanceListener {
 
     private final ComponentLog componentLog;
     private final Consumer<byte[], byte[]> consumer;
     private final Subscription subscription;
-    private final Duration maxUncommittedTime;
     private volatile boolean closed = false;
 
-    public Kafka3ConsumerService(final ComponentLog componentLog, final 
Consumer<byte[], byte[]> consumer, final Subscription subscription,
-            final Duration maxUncommittedTime) {
-
+    public Kafka3ConsumerService(final ComponentLog componentLog, final 
Consumer<byte[], byte[]> consumer, final Subscription subscription) {
         this.componentLog = Objects.requireNonNull(componentLog, "Component 
Log required");
         this.consumer = consumer;
         this.subscription = subscription;
-        this.maxUncommittedTime = maxUncommittedTime;
+
+        final Optional<Pattern> topicPatternFound = 
subscription.getTopicPattern();
+        if (topicPatternFound.isPresent()) {
+            final Pattern topicPattern = topicPatternFound.get();
+            consumer.subscribe(topicPattern, this);
+        } else {
+            final Collection<String> topics = subscription.getTopics();
+            consumer.subscribe(topics, this);
+        }
+    }
+
+    @Override
+    public void onPartitionsAssigned(final Collection<TopicPartition> 
partitions) {
+        componentLog.info("Kafka assigned the following Partitions to this 
consumer: {}", partitions);
+    }
+
+    @Override
+    public void onPartitionsRevoked(final Collection<TopicPartition> 
partitions) {
+        componentLog.info("Kafka revoked the following Partitions from this 
consumer: {}", partitions);
+        rollback(new HashSet<>(partitions));

Review Comment:
   I could use `Set.copyOf()` but not sure to see the benefit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to