exceptionfactory commented on code in PR #10769:
URL: https://github.com/apache/nifi/pull/10769#discussion_r2764601238


##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java:
##########
@@ -135,6 +161,13 @@ public Iterable<ByteRecord> poll(final Duration 
maxWaitDuration) {
             return List.of();
         }
 
+        // Track the maximum offset for each partition to commit during 
rebalance
+        for (final ConsumerRecord<byte[], byte[]> record : consumerRecords) {

Review Comment:
   This approach requires iterating over all Records multiple times, once here, 
and again in actual usage in `RecordIterable`. The `RecordIterable` is designed 
for optimal iteration and client usage. Instead of this loop, I recommend 
adjusting `RecordIterable` to track the offset retrieved. This should also 
provide the opportunity to get the list of `TopicPartitions` once, and avoid 
creating a new instance for every Record.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerServiceTest.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.consumer;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class Kafka3ConsumerServiceTest {
+
+    private static final String TOPIC = "test-topic";
+    private static final String GROUP_ID = "test-group";
+    private static final int PARTITION_0 = 0;
+    private static final int PARTITION_1 = 1;
+
+    @Mock
+    private Consumer<byte[], byte[]> consumer;
+
+    @Mock
+    private ComponentLog componentLog;
+
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
offsetsCaptor;
+
+    private Kafka3ConsumerService consumerService;
+
+    @BeforeEach
+    void setUp() {
+        final Subscription subscription = new Subscription(GROUP_ID, 
Collections.singletonList(TOPIC), AutoOffsetReset.EARLIEST);
+        consumerService = new Kafka3ConsumerService(componentLog, consumer, 
subscription);
+    }
+
+    @Test
+    void testOnPartitionsRevokedCommitsUncommittedOffsets() {
+        // Arrange: Simulate polling records from two partitions
+        final TopicPartition partition0 = new TopicPartition(TOPIC, 
PARTITION_0);
+        final TopicPartition partition1 = new TopicPartition(TOPIC, 
PARTITION_1);
+
+        final ConsumerRecord<byte[], byte[]> record0 = createRecord(TOPIC, 
PARTITION_0, 5L);
+        final ConsumerRecord<byte[], byte[]> record1 = createRecord(TOPIC, 
PARTITION_1, 10L);
+
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsMap = new HashMap<>();
+        recordsMap.put(partition0, List.of(record0));
+        recordsMap.put(partition1, List.of(record1));
+        final ConsumerRecords<byte[], byte[]> consumerRecords = 
createConsumerRecords(recordsMap);
+
+        when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+        // Act: Poll records (this should track the offsets internally)
+        final Iterable<ByteRecord> polledRecords = 
consumerService.poll(Duration.ofMillis(100));
+        // Consume the iterator to ensure records are processed
+        for (ByteRecord ignored : polledRecords) {
+            // Just iterate through
+        }
+
+        // Act: Simulate rebalance - partitions being revoked
+        final Collection<TopicPartition> revokedPartitions = 
List.of(partition0, partition1);
+        consumerService.onPartitionsRevoked(revokedPartitions);
+
+        // Assert: Verify that offsets were committed for the revoked 
partitions
+        verify(consumer).commitSync(offsetsCaptor.capture());
+        final Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
offsetsCaptor.getValue();
+
+        assertEquals(2, committedOffsets.size());
+        // Offset should be record.offset + 1 (next offset to consume)
+        assertEquals(6L, committedOffsets.get(partition0).offset());
+        assertEquals(11L, committedOffsets.get(partition1).offset());
+    }
+
+    @Test
+    void testOnPartitionsRevokedWithNoUncommittedOffsets() {
+        // Arrange: No records polled
+        final TopicPartition partition0 = new TopicPartition(TOPIC, 
PARTITION_0);
+
+        // Act: Simulate rebalance without any prior polling
+        consumerService.onPartitionsRevoked(List.of(partition0));
+
+        // Assert: No commit should be called since there are no uncommitted 
offsets
+        verify(consumer, never()).commitSync(anyMap());
+    }
+
+    @Test
+    void testOnPartitionsRevokedOnlyCommitsRevokedPartitions() {
+        // Arrange: Poll records from two partitions
+        final TopicPartition partition0 = new TopicPartition(TOPIC, 
PARTITION_0);
+        final TopicPartition partition1 = new TopicPartition(TOPIC, 
PARTITION_1);
+
+        final ConsumerRecord<byte[], byte[]> record0 = createRecord(TOPIC, 
PARTITION_0, 5L);
+        final ConsumerRecord<byte[], byte[]> record1 = createRecord(TOPIC, 
PARTITION_1, 10L);
+
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsMap = new HashMap<>();
+        recordsMap.put(partition0, List.of(record0));
+        recordsMap.put(partition1, List.of(record1));
+        final ConsumerRecords<byte[], byte[]> consumerRecords = 
createConsumerRecords(recordsMap);
+
+        when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+        // Act: Poll records
+        final Iterable<ByteRecord> polledRecords = 
consumerService.poll(Duration.ofMillis(100));
+        for (ByteRecord ignored : polledRecords) {
+            // Just iterate through
+        }
+
+        // Act: Only revoke partition 0, keep partition 1
+        consumerService.onPartitionsRevoked(List.of(partition0));
+
+        // Assert: Only partition 0 should be committed
+        verify(consumer).commitSync(offsetsCaptor.capture());
+        final Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
offsetsCaptor.getValue();
+
+        assertEquals(1, committedOffsets.size());
+        assertEquals(6L, committedOffsets.get(partition0).offset());
+        assertFalse(committedOffsets.containsKey(partition1));
+    }
+
+    @Test
+    void testOnPartitionsRevokedTracksMaxOffset() {
+        // Arrange: Poll multiple records from same partition
+        final TopicPartition partition0 = new TopicPartition(TOPIC, 
PARTITION_0);
+
+        final ConsumerRecord<byte[], byte[]> record1 = createRecord(TOPIC, 
PARTITION_0, 5L);
+        final ConsumerRecord<byte[], byte[]> record2 = createRecord(TOPIC, 
PARTITION_0, 7L);
+        final ConsumerRecord<byte[], byte[]> record3 = createRecord(TOPIC, 
PARTITION_0, 6L); // Out of order
+
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsMap = new HashMap<>();
+        recordsMap.put(partition0, List.of(record1, record2, record3));
+        final ConsumerRecords<byte[], byte[]> consumerRecords = 
createConsumerRecords(recordsMap);
+
+        when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+        // Act: Poll records
+        final Iterable<ByteRecord> polledRecords = 
consumerService.poll(Duration.ofMillis(100));
+        for (ByteRecord ignored : polledRecords) {
+            // Just iterate through
+        }
+
+        // Act: Revoke partition
+        consumerService.onPartitionsRevoked(List.of(partition0));
+
+        // Assert: Should commit max offset + 1 (7 + 1 = 8)
+        verify(consumer).commitSync(offsetsCaptor.capture());
+        final Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
offsetsCaptor.getValue();
+
+        assertEquals(1, committedOffsets.size());
+        assertEquals(8L, committedOffsets.get(partition0).offset());
+    }
+
+    @Test
+    void testRollbackClearsUncommittedOffsets() {
+        // Arrange: Poll records
+        final TopicPartition partition0 = new TopicPartition(TOPIC, 
PARTITION_0);
+
+        final ConsumerRecord<byte[], byte[]> record0 = createRecord(TOPIC, 
PARTITION_0, 5L);
+
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsMap = new HashMap<>();
+        recordsMap.put(partition0, List.of(record0));
+        final ConsumerRecords<byte[], byte[]> consumerRecords = 
createConsumerRecords(recordsMap);
+
+        when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+        
when(consumer.assignment()).thenReturn(Collections.singleton(partition0));
+        
when(consumer.committed(any())).thenReturn(Collections.singletonMap(partition0, 
new OffsetAndMetadata(0L)));
+
+        // Act: Poll records
+        final Iterable<ByteRecord> polledRecords = 
consumerService.poll(Duration.ofMillis(100));
+        for (ByteRecord ignored : polledRecords) {
+            // Just iterate through
+        }
+
+        // Act: Rollback

Review Comment:
   I generally recommend avoiding the `Arrange / Act / Assert` comments. Some 
of them provide useful details, so they could be retained, but others just 
repeat what the method call is named.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
+import org.apache.nifi.kafka.service.consumer.Subscription;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Integration tests for verifying that ConsumeKafka correctly handles 
consumer group rebalances
+ * without causing duplicate message processing.
+ */
+class ConsumeKafkaRebalanceIT extends AbstractConsumeKafkaIT {
+
+    /**
+     * Tests that when onPartitionsRevoked is called (simulating rebalance), 
the consumer
+     * correctly commits offsets, and a subsequent consumer in the same group 
doesn't
+     * re-consume the same messages (no duplicates).
+     *
+     * This test:
+     * 1. Produces messages to a multi-partition topic
+     * 2. Consumer 1 polls and processes messages
+     * 3. Simulates rebalance by calling onPartitionsRevoked on Consumer 1
+     * 4. Consumer 2 joins and continues consuming from committed offsets
+     * 5. Verifies no duplicate messages were consumed
+     */
+    @Test
+    @Timeout(value = 60, unit = TimeUnit.SECONDS)
+    void testRebalanceDoesNotCauseDuplicates() throws Exception {
+        final String topic = "rebalance-test-" + UUID.randomUUID();
+        final String groupId = "rebalance-group-" + UUID.randomUUID();
+        final int numPartitions = 3;
+        final int messagesPerPartition = 20;
+        final int totalMessages = numPartitions * messagesPerPartition;
+
+        // Create topic with multiple partitions
+        createTopic(topic, numPartitions);
+
+        // Produce messages to all partitions
+        produceMessagesToTopic(topic, numPartitions, messagesPerPartition);
+
+        // Track consumed messages to detect duplicates
+        final Set<String> consumedMessages = new HashSet<>();
+        final AtomicInteger duplicateCount = new AtomicInteger(0);
+
+        final ComponentLog mockLog = mock(ComponentLog.class);
+
+        // Consumer 1: Poll some messages, then simulate rebalance
+        final Properties props1 = getConsumerProperties(groupId);
+        try (KafkaConsumer<byte[], byte[]> kafkaConsumer1 = new 
KafkaConsumer<>(props1)) {
+            final Subscription subscription = new Subscription(groupId, 
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+            final Kafka3ConsumerService service1 = new 
Kafka3ConsumerService(mockLog, kafkaConsumer1, subscription);
+
+            // Poll about half the messages
+            int consumer1Count = 0;
+            int maxAttempts = 20;
+            while (consumer1Count < totalMessages / 2 && maxAttempts-- > 0) {
+                for (ByteRecord record : service1.poll(Duration.ofSeconds(2))) 
{
+                    final String messageId = record.getTopic() + "-" + 
record.getPartition() + "-" + record.getOffset();
+                    if (!consumedMessages.add(messageId)) {
+                        duplicateCount.incrementAndGet();
+                    }
+                    consumer1Count++;
+                }
+            }
+
+            // Simulate rebalance - this should commit the offsets
+            final Set<TopicPartition> assignment = kafkaConsumer1.assignment();
+            service1.onPartitionsRevoked(assignment);
+            service1.close();
+        }
+
+        // Consumer 2: Should continue from where Consumer 1 left off (no 
duplicates)
+        final Properties props2 = getConsumerProperties(groupId);
+        try (KafkaConsumer<byte[], byte[]> kafkaConsumer2 = new 
KafkaConsumer<>(props2)) {
+            final Subscription subscription = new Subscription(groupId, 
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+            final Kafka3ConsumerService service2 = new 
Kafka3ConsumerService(mockLog, kafkaConsumer2, subscription);
+
+            // Poll remaining messages
+            int emptyPolls = 0;
+            while (emptyPolls < 5 && consumedMessages.size() < totalMessages) {
+                boolean hasRecords = false;
+                for (ByteRecord record : service2.poll(Duration.ofSeconds(2))) 
{
+                    hasRecords = true;
+                    final String messageId = record.getTopic() + "-" + 
record.getPartition() + "-" + record.getOffset();
+                    if (!consumedMessages.add(messageId)) {
+                        duplicateCount.incrementAndGet();
+                    }
+                }
+                if (!hasRecords) {
+                    emptyPolls++;
+                } else {
+                    emptyPolls = 0;
+                }
+            }
+
+            service2.close();
+        }
+
+        // Verify results

Review Comment:
   ```suggestion
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
+import org.apache.nifi.kafka.service.consumer.Subscription;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Integration tests for verifying that ConsumeKafka correctly handles 
consumer group rebalances
+ * without causing duplicate message processing.
+ */
+class ConsumeKafkaRebalanceIT extends AbstractConsumeKafkaIT {
+
+    /**
+     * Tests that when onPartitionsRevoked is called (simulating rebalance), 
the consumer
+     * correctly commits offsets, and a subsequent consumer in the same group 
doesn't
+     * re-consume the same messages (no duplicates).
+     *
+     * This test:
+     * 1. Produces messages to a multi-partition topic
+     * 2. Consumer 1 polls and processes messages
+     * 3. Simulates rebalance by calling onPartitionsRevoked on Consumer 1
+     * 4. Consumer 2 joins and continues consuming from committed offsets
+     * 5. Verifies no duplicate messages were consumed
+     */
+    @Test
+    @Timeout(value = 60, unit = TimeUnit.SECONDS)
+    void testRebalanceDoesNotCauseDuplicates() throws Exception {
+        final String topic = "rebalance-test-" + UUID.randomUUID();
+        final String groupId = "rebalance-group-" + UUID.randomUUID();

Review Comment:
   Recommend declaring static values for topic and partition across test 
methods.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
+import org.apache.nifi.kafka.service.consumer.Subscription;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Integration tests for verifying that ConsumeKafka correctly handles 
consumer group rebalances
+ * without causing duplicate message processing.
+ */
+class ConsumeKafkaRebalanceIT extends AbstractConsumeKafkaIT {
+
+    /**
+     * Tests that when onPartitionsRevoked is called (simulating rebalance), 
the consumer
+     * correctly commits offsets, and a subsequent consumer in the same group 
doesn't
+     * re-consume the same messages (no duplicates).
+     *
+     * This test:
+     * 1. Produces messages to a multi-partition topic
+     * 2. Consumer 1 polls and processes messages
+     * 3. Simulates rebalance by calling onPartitionsRevoked on Consumer 1
+     * 4. Consumer 2 joins and continues consuming from committed offsets
+     * 5. Verifies no duplicate messages were consumed
+     */
+    @Test
+    @Timeout(value = 60, unit = TimeUnit.SECONDS)
+    void testRebalanceDoesNotCauseDuplicates() throws Exception {
+        final String topic = "rebalance-test-" + UUID.randomUUID();
+        final String groupId = "rebalance-group-" + UUID.randomUUID();
+        final int numPartitions = 3;
+        final int messagesPerPartition = 20;
+        final int totalMessages = numPartitions * messagesPerPartition;
+
+        // Create topic with multiple partitions
+        createTopic(topic, numPartitions);
+
+        // Produce messages to all partitions
+        produceMessagesToTopic(topic, numPartitions, messagesPerPartition);
+
+        // Track consumed messages to detect duplicates
+        final Set<String> consumedMessages = new HashSet<>();
+        final AtomicInteger duplicateCount = new AtomicInteger(0);
+
+        final ComponentLog mockLog = mock(ComponentLog.class);
+
+        // Consumer 1: Poll some messages, then simulate rebalance
+        final Properties props1 = getConsumerProperties(groupId);
+        try (KafkaConsumer<byte[], byte[]> kafkaConsumer1 = new 
KafkaConsumer<>(props1)) {
+            final Subscription subscription = new Subscription(groupId, 
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+            final Kafka3ConsumerService service1 = new 
Kafka3ConsumerService(mockLog, kafkaConsumer1, subscription);
+
+            // Poll about half the messages
+            int consumer1Count = 0;
+            int maxAttempts = 20;
+            while (consumer1Count < totalMessages / 2 && maxAttempts-- > 0) {
+                for (ByteRecord record : service1.poll(Duration.ofSeconds(2))) 
{
+                    final String messageId = record.getTopic() + "-" + 
record.getPartition() + "-" + record.getOffset();
+                    if (!consumedMessages.add(messageId)) {
+                        duplicateCount.incrementAndGet();
+                    }
+                    consumer1Count++;
+                }
+            }
+
+            // Simulate rebalance - this should commit the offsets
+            final Set<TopicPartition> assignment = kafkaConsumer1.assignment();
+            service1.onPartitionsRevoked(assignment);
+            service1.close();
+        }
+
+        // Consumer 2: Should continue from where Consumer 1 left off (no 
duplicates)
+        final Properties props2 = getConsumerProperties(groupId);
+        try (KafkaConsumer<byte[], byte[]> kafkaConsumer2 = new 
KafkaConsumer<>(props2)) {
+            final Subscription subscription = new Subscription(groupId, 
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+            final Kafka3ConsumerService service2 = new 
Kafka3ConsumerService(mockLog, kafkaConsumer2, subscription);
+
+            // Poll remaining messages
+            int emptyPolls = 0;
+            while (emptyPolls < 5 && consumedMessages.size() < totalMessages) {
+                boolean hasRecords = false;
+                for (ByteRecord record : service2.poll(Duration.ofSeconds(2))) 
{
+                    hasRecords = true;
+                    final String messageId = record.getTopic() + "-" + 
record.getPartition() + "-" + record.getOffset();
+                    if (!consumedMessages.add(messageId)) {
+                        duplicateCount.incrementAndGet();
+                    }
+                }
+                if (!hasRecords) {
+                    emptyPolls++;
+                } else {
+                    emptyPolls = 0;
+                }
+            }
+
+            service2.close();
+        }
+
+        // Verify results
+        assertEquals(0, duplicateCount.get(),
+                "Expected no duplicate messages but found " + 
duplicateCount.get());
+        assertEquals(totalMessages, consumedMessages.size(),
+                "Expected to consume " + totalMessages + " unique messages but 
got " + consumedMessages.size());
+    }
+
+    /**
+     * Tests that offsets are committed during rebalance by simulating the 
onPartitionsRevoked callback.
+     * This test:
+     * 1. Creates a consumer and polls messages
+     * 2. Manually invokes onPartitionsRevoked (simulating what Kafka does 
during rebalance)
+     * 3. Verifies that offsets were committed to Kafka
+     */
+    @Test
+    @Timeout(value = 60, unit = TimeUnit.SECONDS)
+    void testOffsetsCommittedDuringRebalance() throws Exception {
+        final String topic = "rebalance-offset-test-" + UUID.randomUUID();
+        final String groupId = "rebalance-offset-group-" + UUID.randomUUID();
+
+        // Create topic with multiple partitions
+        createTopic(topic, 3);
+
+        // Produce some messages (10 per partition = 30 total)
+        final int messagesPerPartition = 10;
+        produceMessagesToTopic(topic, 3, messagesPerPartition);
+
+        final ComponentLog mockLog = mock(ComponentLog.class);
+
+        // Create consumer and poll messages
+        final Properties props = getConsumerProperties(groupId);
+        try (KafkaConsumer<byte[], byte[]> kafkaConsumer = new 
KafkaConsumer<>(props)) {
+            final Subscription subscription = new Subscription(groupId, 
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+            final Kafka3ConsumerService service = new 
Kafka3ConsumerService(mockLog, kafkaConsumer, subscription);
+
+            // Poll messages until we have some
+            int polledCount = 0;
+            int maxAttempts = 20;
+            while (polledCount < 15 && maxAttempts-- > 0) {
+                for (ByteRecord ignored : service.poll(Duration.ofSeconds(2))) 
{
+                    polledCount++;
+                }
+            }
+
+            assertTrue(polledCount > 0, "Should have polled at least some 
messages");
+
+            // Get the current assignment before simulating rebalance
+            final Set<TopicPartition> assignment = kafkaConsumer.assignment();
+            assertFalse(assignment.isEmpty(), "Consumer should have partition 
assignments");
+
+            // Simulate rebalance by calling onPartitionsRevoked
+            // This is what Kafka calls when a rebalance occurs
+            service.onPartitionsRevoked(assignment);
+
+            // Close the service
+            service.close();
+        }
+
+        // Verify that offsets were committed by checking with a new consumer
+        try (KafkaConsumer<byte[], byte[]> verifyConsumer = new 
KafkaConsumer<>(getConsumerProperties(groupId))) {
+            final Set<TopicPartition> partitions = new HashSet<>();
+            for (int i = 0; i < 3; i++) {
+                partitions.add(new TopicPartition(topic, i));
+            }
+
+            final Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
verifyConsumer.committed(partitions);
+
+            // At least some offsets should be committed
+            long totalCommitted = committedOffsets.values().stream()
+                    .filter(o -> o != null)
+                    .mapToLong(OffsetAndMetadata::offset)
+                    .sum();
+
+            assertTrue(totalCommitted > 0,
+                    "Expected offsets to be committed during 
onPartitionsRevoked, but total committed offset was " + totalCommitted);
+        }
+    }
+
+    /**
+     * Produces messages to a specific topic with a given number of partitions.
+     */
+    private void produceMessagesToTopic(final String topic, final int 
numPartitions, final int messagesPerPartition) throws Exception {
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+
+        try (KafkaProducer<String, String> producer = new 
KafkaProducer<>(producerProps)) {
+            for (int partition = 0; partition < numPartitions; partition++) {
+                for (int i = 0; i < messagesPerPartition; i++) {
+                    final String key = "key-" + partition + "-" + i;
+                    final String value = "value-" + partition + "-" + i;
+                    producer.send(new ProducerRecord<>(topic, partition, key, 
value)).get();
+                }
+            }
+        }
+    }
+
+    private void createTopic(final String topic, final int numPartitions) 
throws Exception {
+        final Properties adminProps = new Properties();
+        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers());
+
+        try (Admin admin = Admin.create(adminProps)) {
+            final NewTopic newTopic = new NewTopic(topic, numPartitions, 
(short) 1);
+            
admin.createTopics(Collections.singletonList(newTopic)).all().get(30, 
TimeUnit.SECONDS);
+        }
+
+        // Wait for topic to be fully created
+        Thread.sleep(1000);

Review Comment:
   Is there a better way to do this than sleeping?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to