This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new ede26f48d7 NIFI-15464 Commit pending offsets for revoked partitions in 
ConsumeKafka (#10769)
ede26f48d7 is described below

commit ede26f48d7ee3d6b54b8bc4da5cda205abcc9917
Author: Pierre Villard <[email protected]>
AuthorDate: Sat Feb 7 20:12:07 2026 +0100

    NIFI-15464 Commit pending offsets for revoked partitions in ConsumeKafka 
(#10769)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../kafka/processors/ConsumeKafkaRebalanceIT.java  | 360 +++++++++++++++++++++
 .../apache/nifi/kafka/processors/ConsumeKafka.java |  33 +-
 .../service/api/consumer/KafkaConsumerService.java |  39 +++
 .../service/consumer/Kafka3ConsumerService.java    | 109 ++++++-
 .../consumer/Kafka3ConsumerServiceTest.java        | 287 ++++++++++++++++
 5 files changed, 817 insertions(+), 11 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java
new file mode 100644
index 0000000000..d4a8103abe
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
+import org.apache.nifi.kafka.service.consumer.Subscription;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Integration tests for verifying that ConsumeKafka correctly handles 
consumer group rebalances
+ * without causing duplicate message processing.
+ */
+class ConsumeKafkaRebalanceIT extends AbstractConsumeKafkaIT {
+
+    private static final int NUM_PARTITIONS = 3;
+    private static final int MESSAGES_PER_PARTITION = 20;
+
+    /**
+     * Tests that when onPartitionsRevoked is called (simulating rebalance), 
the consumer
+     * correctly commits offsets, and a subsequent consumer in the same group 
doesn't
+     * re-consume the same messages (no duplicates).
+     *
+     * This test:
+     * 1. Produces messages to a multi-partition topic
+     * 2. Consumer 1 polls and processes messages
+     * 3. Simulates rebalance by calling onPartitionsRevoked on Consumer 1
+     * 4. Consumer 2 joins and continues consuming from committed offsets
+     * 5. Verifies no duplicate messages were consumed
+     */
+    @Test
+    @Timeout(value = 60, unit = TimeUnit.SECONDS)
+    void testRebalanceDoesNotCauseDuplicates() throws Exception {
+        final String topic = "rebalance-test-" + UUID.randomUUID();
+        final String groupId = "rebalance-group-" + UUID.randomUUID();
+        final int totalMessages = NUM_PARTITIONS * MESSAGES_PER_PARTITION;
+
+        createTopic(topic, NUM_PARTITIONS);
+        produceMessagesToTopic(topic, NUM_PARTITIONS, MESSAGES_PER_PARTITION);
+
+        final Set<String> consumedMessages = new HashSet<>();
+        final AtomicInteger duplicateCount = new AtomicInteger(0);
+        final ComponentLog mockLog = mock(ComponentLog.class);
+
+        final Properties props1 = getConsumerProperties(groupId);
+        try (KafkaConsumer<byte[], byte[]> kafkaConsumer1 = new 
KafkaConsumer<>(props1)) {
+            final Subscription subscription = new Subscription(groupId, 
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+            final Kafka3ConsumerService service1 = new 
Kafka3ConsumerService(mockLog, kafkaConsumer1, subscription);
+
+            int consumer1Count = 0;
+            int maxAttempts = 20;
+            while (consumer1Count < totalMessages / 2 && maxAttempts-- > 0) {
+                for (ByteRecord record : service1.poll(Duration.ofSeconds(2))) 
{
+                    final String messageId = record.getTopic() + "-" + 
record.getPartition() + "-" + record.getOffset();
+                    if (!consumedMessages.add(messageId)) {
+                        duplicateCount.incrementAndGet();
+                    }
+                    consumer1Count++;
+                }
+            }
+
+            final Set<TopicPartition> assignment = kafkaConsumer1.assignment();
+            service1.onPartitionsRevoked(assignment);
+            // Simulate processor committing offsets after successful session 
commit
+            service1.commitOffsetsForRevokedPartitions();
+            service1.close();
+        }
+
+        final Properties props2 = getConsumerProperties(groupId);
+        try (KafkaConsumer<byte[], byte[]> kafkaConsumer2 = new 
KafkaConsumer<>(props2)) {
+            final Subscription subscription = new Subscription(groupId, 
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+            final Kafka3ConsumerService service2 = new 
Kafka3ConsumerService(mockLog, kafkaConsumer2, subscription);
+
+            int emptyPolls = 0;
+            while (emptyPolls < 5 && consumedMessages.size() < totalMessages) {
+                boolean hasRecords = false;
+                for (ByteRecord record : service2.poll(Duration.ofSeconds(2))) 
{
+                    hasRecords = true;
+                    final String messageId = record.getTopic() + "-" + 
record.getPartition() + "-" + record.getOffset();
+                    if (!consumedMessages.add(messageId)) {
+                        duplicateCount.incrementAndGet();
+                    }
+                }
+                if (!hasRecords) {
+                    emptyPolls++;
+                } else {
+                    emptyPolls = 0;
+                }
+            }
+
+            service2.close();
+        }
+
+        assertEquals(0, duplicateCount.get(),
+                "Expected no duplicate messages but found " + 
duplicateCount.get());
+        assertEquals(totalMessages, consumedMessages.size(),
+                "Expected to consume " + totalMessages + " unique messages but 
got " + consumedMessages.size());
+    }
+
+    /**
+     * Tests that offsets can be committed after rebalance when processor 
calls commitOffsetsForRevokedPartitions.
+     *
+     * This test:
+     * 1. Creates a consumer and polls messages
+     * 2. Manually invokes onPartitionsRevoked (simulating what Kafka does 
during rebalance)
+     * 3. Calls commitOffsetsForRevokedPartitions (simulating processor 
committing after session commit)
+     * 4. Verifies that offsets were committed to Kafka
+     */
+    @Test
+    @Timeout(value = 60, unit = TimeUnit.SECONDS)
+    void testOffsetsCommittedDuringRebalance() throws Exception {
+        final String topic = "rebalance-offset-test-" + UUID.randomUUID();
+        final String groupId = "rebalance-offset-group-" + UUID.randomUUID();
+        final int messagesPerPartition = 10;
+
+        createTopic(topic, NUM_PARTITIONS);
+        produceMessagesToTopic(topic, NUM_PARTITIONS, messagesPerPartition);
+
+        final ComponentLog mockLog = mock(ComponentLog.class);
+        final Properties props = getConsumerProperties(groupId);
+
+        try (KafkaConsumer<byte[], byte[]> kafkaConsumer = new 
KafkaConsumer<>(props)) {
+            final Subscription subscription = new Subscription(groupId, 
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+            final Kafka3ConsumerService service = new 
Kafka3ConsumerService(mockLog, kafkaConsumer, subscription);
+
+            int polledCount = 0;
+            int maxAttempts = 20;
+            while (polledCount < 15 && maxAttempts-- > 0) {
+                for (ByteRecord ignored : service.poll(Duration.ofSeconds(2))) 
{
+                    polledCount++;
+                }
+            }
+
+            assertTrue(polledCount > 0, "Should have polled at least some 
messages");
+
+            final Set<TopicPartition> assignment = kafkaConsumer.assignment();
+            assertFalse(assignment.isEmpty(), "Consumer should have partition 
assignments");
+
+            service.onPartitionsRevoked(assignment);
+            // Simulate processor committing offsets after successful session 
commit
+            service.commitOffsetsForRevokedPartitions();
+            service.close();
+        }
+
+        try (KafkaConsumer<byte[], byte[]> verifyConsumer = new 
KafkaConsumer<>(getConsumerProperties(groupId))) {
+            final Set<TopicPartition> partitions = new HashSet<>();
+            for (int i = 0; i < NUM_PARTITIONS; i++) {
+                partitions.add(new TopicPartition(topic, i));
+            }
+
+            final Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
verifyConsumer.committed(partitions);
+
+            long totalCommitted = committedOffsets.values().stream()
+                    .filter(o -> o != null)
+                    .mapToLong(OffsetAndMetadata::offset)
+                    .sum();
+
+            assertTrue(totalCommitted > 0,
+                    "Expected offsets to be committed after 
commitOffsetsForRevokedPartitions, but total committed offset was " + 
totalCommitted);
+        }
+    }
+
+    /**
+     * Tests that records are NOT lost when a rebalance occurs before 
processing is complete.
+     *
+     * This test simulates the scenario where:
+     * 1. Consumer polls and iterates through records (tracking offsets 
internally)
+     * 2. Rebalance occurs (onPartitionsRevoked called) BEFORE the processor 
commits its session
+     * 3. Consumer "fails" (simulating crash or processing failure) without 
committing offsets
+     * 4. New consumer joins with the same group
+     * 5. The new consumer receives the same records since they were never 
successfully processed
+     */
+    @Test
+    @Timeout(value = 60, unit = TimeUnit.SECONDS)
+    void testNoDataLossWhenRebalanceOccursBeforeProcessingComplete() throws 
Exception {
+        final String topic = "dataloss-test-" + UUID.randomUUID();
+        final String groupId = "dataloss-group-" + UUID.randomUUID();
+        final int messagesPerPartition = 10;
+        final int totalMessages = NUM_PARTITIONS * messagesPerPartition;
+
+        createTopic(topic, NUM_PARTITIONS);
+        produceMessagesToTopic(topic, NUM_PARTITIONS, messagesPerPartition);
+
+        final ComponentLog mockLog = mock(ComponentLog.class);
+        int recordsPolledByFirstConsumer = 0;
+
+        // Consumer 1: Poll and iterate records, then rebalance occurs, but 
processing "fails"
+        final Properties props1 = getConsumerProperties(groupId);
+        try (KafkaConsumer<byte[], byte[]> kafkaConsumer1 = new 
KafkaConsumer<>(props1)) {
+            final Subscription subscription = new Subscription(groupId, 
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+            final Kafka3ConsumerService service1 = new 
Kafka3ConsumerService(mockLog, kafkaConsumer1, subscription);
+
+            // Poll and iterate through records - this tracks offsets 
internally
+            int maxAttempts = 20;
+            while (recordsPolledByFirstConsumer < totalMessages && 
maxAttempts-- > 0) {
+                for (ByteRecord ignored : 
service1.poll(Duration.ofSeconds(2))) {
+                    recordsPolledByFirstConsumer++;
+                }
+            }
+
+            assertTrue(recordsPolledByFirstConsumer > 0, "First consumer 
should have polled some records");
+
+            // Simulate rebalance occurring before processor commits its 
session
+            final Set<TopicPartition> assignment = kafkaConsumer1.assignment();
+            assertFalse(assignment.isEmpty(), "Consumer should have partition 
assignments");
+            service1.onPartitionsRevoked(assignment);
+
+            // DO NOT call any "commit" or "process" method - simulating that 
the processor
+            // never completed processing (e.g., session commit failed, 
process crashed, etc.)
+
+            service1.close();
+        }
+
+        // Consumer 2: Should receive the SAME records because processing was 
never completed
+        int recordsPolledBySecondConsumer = 0;
+        final Properties props2 = getConsumerProperties(groupId);
+        try (KafkaConsumer<byte[], byte[]> kafkaConsumer2 = new 
KafkaConsumer<>(props2)) {
+            final Subscription subscription = new Subscription(groupId, 
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+            final Kafka3ConsumerService service2 = new 
Kafka3ConsumerService(mockLog, kafkaConsumer2, subscription);
+
+            // Poll for records - if no data loss, we should get the same 
records again
+            int emptyPolls = 0;
+            while (emptyPolls < 5) {
+                boolean hasRecords = false;
+                for (ByteRecord ignored : 
service2.poll(Duration.ofSeconds(2))) {
+                    hasRecords = true;
+                    recordsPolledBySecondConsumer++;
+                }
+                if (!hasRecords) {
+                    emptyPolls++;
+                } else {
+                    emptyPolls = 0;
+                }
+            }
+
+            service2.close();
+        }
+
+        // Records should NOT be lost - the second consumer should receive
+        // at least the records that were polled by the first consumer but 
never processed
+        assertTrue(recordsPolledBySecondConsumer >= 
recordsPolledByFirstConsumer,
+                "Data loss detected! First consumer polled " + 
recordsPolledByFirstConsumer +
+                " records but second consumer only received " + 
recordsPolledBySecondConsumer +
+                " records. Expected second consumer to receive at least " + 
recordsPolledByFirstConsumer +
+                " records since processing was never completed.");
+    }
+
+    /**
+     * Produces messages to a specific topic with a given number of partitions.
+     */
+    private void produceMessagesToTopic(final String topic, final int 
numPartitions, final int messagesPerPartition) throws Exception {
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+
+        try (KafkaProducer<String, String> producer = new 
KafkaProducer<>(producerProps)) {
+            for (int partition = 0; partition < numPartitions; partition++) {
+                for (int i = 0; i < messagesPerPartition; i++) {
+                    final String key = "key-" + partition + "-" + i;
+                    final String value = "value-" + partition + "-" + i;
+                    producer.send(new ProducerRecord<>(topic, partition, key, 
value)).get();
+                }
+            }
+        }
+    }
+
+    private void createTopic(final String topic, final int numPartitions) 
throws Exception {
+        final Properties adminProps = new Properties();
+        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers());
+
+        try (Admin admin = Admin.create(adminProps)) {
+            final NewTopic newTopic = new NewTopic(topic, numPartitions, 
(short) 1);
+            
admin.createTopics(Collections.singletonList(newTopic)).all().get(30, 
TimeUnit.SECONDS);
+            waitForTopicReady(admin, topic, numPartitions);
+        }
+    }
+
+    private void waitForTopicReady(final Admin admin, final String topic, 
final int expectedPartitions) throws Exception {
+        final long startTime = System.currentTimeMillis();
+        final long timeoutMillis = 30000;
+
+        while (System.currentTimeMillis() - startTime < timeoutMillis) {
+            try {
+                final Map<String, TopicDescription> descriptions = 
admin.describeTopics(Collections.singletonList(topic))
+                        .allTopicNames()
+                        .get(10, TimeUnit.SECONDS);
+                final TopicDescription description = descriptions.get(topic);
+                if (description != null && description.partitions().size() == 
expectedPartitions) {
+                    return;
+                }
+            } catch (ExecutionException ignored) {
+                // Topic not ready yet, continue polling
+            }
+            Thread.sleep(100);
+        }
+        throw new RuntimeException("Topic " + topic + " not ready after " + 
timeoutMillis + "ms");
+    }
+
+    private Properties getConsumerProperties(final String groupId) {
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
+        // Use shorter session timeout to speed up rebalance detection
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
+        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
+        return props;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
index 336f6e77d9..c1658822cb 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
@@ -435,12 +435,23 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
                 final Iterator<ByteRecord> consumerRecords = 
consumerService.poll(maxWaitDuration).iterator();
                 if (!consumerRecords.hasNext()) {
                     getLogger().trace("No Kafka Records consumed: {}", 
pollingContext);
+                    // Check if a rebalance occurred during poll - if so, 
break to commit what we have
+                    if (consumerService.hasRevokedPartitions()) {
+                        getLogger().debug("Rebalance detected with revoked 
partitions, breaking to commit session");
+                        break;
+                    }
                     continue;
                 }
 
                 recordsReceived = true;
                 processConsumerRecords(context, session, offsetTracker, 
consumerRecords);
 
+                // Check if a rebalance occurred during poll - if so, break to 
commit what we have
+                if (consumerService.hasRevokedPartitions()) {
+                    getLogger().debug("Rebalance detected with revoked 
partitions, breaking to commit session");
+                    break;
+                }
+
                 if (maxUncommittedSizeConfigured) {
                     // Stop consuming before reaching Max Uncommitted Time 
when exceeding Max Uncommitted Size
                     final long totalRecordSize = 
offsetTracker.getTotalRecordSize();
@@ -460,12 +471,24 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
             }
         }
 
-        if (!recordsReceived) {
+        if (!recordsReceived && !consumerService.hasRevokedPartitions()) {
             getLogger().trace("No Kafka Records consumed, re-queuing 
consumer");
             consumerServices.offer(consumerService);
             return;
         }
 
+        // If no records received but we have revoked partitions, we still 
need to commit their offsets
+        if (!recordsReceived && consumerService.hasRevokedPartitions()) {
+            getLogger().debug("No records received but rebalance occurred, 
committing offsets for revoked partitions");
+            try {
+                consumerService.commitOffsetsForRevokedPartitions();
+            } catch (final Exception e) {
+                getLogger().warn("Failed to commit offsets for revoked 
partitions", e);
+            }
+            consumerServices.offer(consumerService);
+            return;
+        }
+
         session.commitAsync(
             () -> commitOffsets(consumerService, offsetTracker, 
pollingContext, session),
             throwable -> {
@@ -485,6 +508,12 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
                 });
             }
 
+            // After successful session commit, also commit offsets for any 
partitions that were revoked during rebalance
+            if (consumerService.hasRevokedPartitions()) {
+                getLogger().debug("Committing offsets for partitions revoked 
during rebalance");
+                consumerService.commitOffsetsForRevokedPartitions();
+            }
+
             consumerServices.offer(consumerService);
             getLogger().debug("Committed offsets for Kafka Consumer Service");
         } catch (final Exception e) {
@@ -496,6 +525,8 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
     private void rollback(final KafkaConsumerService consumerService, final 
OffsetTracker offsetTracker, final ProcessSession session) {
         if (!consumerService.isClosed()) {
             try {
+                // Clear any pending revoked partitions since we're rolling 
back
+                consumerService.clearRevokedPartitions();
                 consumerService.rollback();
                 consumerServices.offer(consumerService);
                 getLogger().debug("Rolled back offsets for Kafka Consumer 
Service");
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java
index 1a0e36affc..310f31ad26 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java
@@ -22,6 +22,8 @@ import org.apache.nifi.kafka.service.api.record.ByteRecord;
 
 import java.io.Closeable;
 import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.OptionalLong;
 
@@ -67,4 +69,41 @@ public interface KafkaConsumerService extends Closeable {
      * @return OptionalLong containing the current lag or empty when not 
available
      */
     OptionalLong currentLag(TopicPartitionSummary topicPartitionSummary);
+
+    /**
+     * Check if a Kafka consumer group rebalance has occurred and partitions 
were revoked.
+     * When partitions are revoked, the processor should commit its session 
before calling
+     * {@link #commitOffsetsForRevokedPartitions()} to avoid data loss.
+     *
+     * @return <code>true</code> if partitions have been revoked and are 
pending commit; <code>false</code> otherwise
+     */
+    default boolean hasRevokedPartitions() {
+        return false;
+    }
+
+    /**
+     * Get the collection of partitions that were revoked during a rebalance 
and are pending commit.
+     * This does not clear the revoked partitions; call {@link 
#commitOffsetsForRevokedPartitions()}
+     * or {@link #clearRevokedPartitions()} to clear them.
+     *
+     * @return Collection of revoked partition states, or empty collection if 
none
+     */
+    default Collection<PartitionState> getRevokedPartitions() {
+        return Collections.emptyList();
+    }
+
+    /**
+     * Commit offsets for partitions that were revoked during a rebalance.
+     * This method should be called by the processor AFTER successfully 
committing its session
+     * to ensure no data loss occurs. After calling this method, the revoked 
partitions are cleared.
+     */
+    default void commitOffsetsForRevokedPartitions() {
+    }
+
+    /**
+     * Clear the revoked partitions without committing their offsets.
+     * This should be called when the processor decides not to commit (e.g., 
on rollback).
+     */
+    default void clearRevokedPartitions() {
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
index 250594c60c..0af2d79ede 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
@@ -37,7 +37,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -45,9 +44,10 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.regex.Pattern;
-
-import static java.util.stream.Collectors.toList;
+import java.util.stream.Collectors;
 
 /**
  * Kafka 3 Consumer Service implementation with Object Pooling for subscribed 
Kafka Consumers
@@ -57,6 +57,8 @@ public class Kafka3ConsumerService implements 
KafkaConsumerService, Closeable, C
     private final ComponentLog componentLog;
     private final Consumer<byte[], byte[]> consumer;
     private final Subscription subscription;
+    private final Map<TopicPartition, Long> uncommittedOffsets = new 
ConcurrentHashMap<>();
+    private final Set<TopicPartition> revokedPartitions = new 
CopyOnWriteArraySet<>();
     private volatile boolean closed = false;
 
     public Kafka3ConsumerService(final ComponentLog componentLog, final 
Consumer<byte[], byte[]> consumer, final Subscription subscription) {
@@ -82,7 +84,19 @@ public class Kafka3ConsumerService implements 
KafkaConsumerService, Closeable, C
     @Override
     public void onPartitionsRevoked(final Collection<TopicPartition> 
partitions) {
         componentLog.info("Kafka revoked the following Partitions from this 
consumer: {}", partitions);
-        rollback(new HashSet<>(partitions));
+
+        // Store revoked partitions for the processor to handle after 
committing its session.
+        // We do NOT commit offsets here to avoid data loss - the processor 
must commit its
+        // session first, then call commitOffsetsForRevokedPartitions().
+        for (final TopicPartition partition : partitions) {
+            if (uncommittedOffsets.containsKey(partition)) {
+                revokedPartitions.add(partition);
+            }
+        }
+
+        if (!revokedPartitions.isEmpty()) {
+            componentLog.info("Partitions revoked with uncommitted offsets, 
pending processor commit: {}", revokedPartitions);
+        }
     }
 
     @Override
@@ -91,7 +105,10 @@ public class Kafka3ConsumerService implements 
KafkaConsumerService, Closeable, C
 
         final long started = System.currentTimeMillis();
         consumer.commitSync(offsets);
-        final long elapsed = started - System.currentTimeMillis();
+        final long elapsed = System.currentTimeMillis() - started;
+
+        // Clear tracked offsets for committed partitions
+        offsets.keySet().forEach(uncommittedOffsets::remove);
 
         componentLog.debug("Committed Records in [{} ms] for {}", elapsed, 
pollingSummary);
     }
@@ -106,6 +123,11 @@ public class Kafka3ConsumerService implements 
KafkaConsumerService, Closeable, C
             return;
         }
 
+        // Clear tracked offsets for rolled back partitions
+        partitions.forEach(uncommittedOffsets::remove);
+        // Clear any revoked partitions that are being rolled back
+        revokedPartitions.removeAll(partitions);
+
         try {
             final Map<TopicPartition, OffsetAndMetadata> metadataMap = 
consumer.committed(partitions);
             for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
metadataMap.entrySet()) {
@@ -137,7 +159,7 @@ public class Kafka3ConsumerService implements 
KafkaConsumerService, Closeable, C
             return List.of();
         }
 
-        return new RecordIterable(consumerRecords);
+        return new RecordIterable(consumerRecords, uncommittedOffsets);
     }
 
     @Override
@@ -153,7 +175,7 @@ public class Kafka3ConsumerService implements 
KafkaConsumerService, Closeable, C
                 .map(partitionInfo -> new PartitionState(
                         partitionInfo.topic(),
                         partitionInfo.partition()))
-                .collect(toList());
+                .collect(Collectors.toList());
         } else {
             partitionStates = Collections.emptyList();
         }
@@ -179,6 +201,53 @@ public class Kafka3ConsumerService implements 
KafkaConsumerService, Closeable, C
         consumer.close();
     }
 
+    @Override
+    public boolean hasRevokedPartitions() {
+        return !revokedPartitions.isEmpty();
+    }
+
+    @Override
+    public Collection<PartitionState> getRevokedPartitions() {
+        return revokedPartitions.stream()
+                .map(tp -> new PartitionState(tp.topic(), tp.partition()))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public void commitOffsetsForRevokedPartitions() {
+        if (revokedPartitions.isEmpty()) {
+            return;
+        }
+
+        final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>();
+        for (final TopicPartition partition : revokedPartitions) {
+            final Long offset = uncommittedOffsets.remove(partition);
+            if (offset != null) {
+                offsetsToCommit.put(partition, new OffsetAndMetadata(offset));
+            }
+        }
+
+        if (!offsetsToCommit.isEmpty()) {
+            try {
+                consumer.commitSync(offsetsToCommit);
+                componentLog.info("Committed offsets for revoked partitions 
after processor commit: {}", offsetsToCommit);
+            } catch (final Exception e) {
+                componentLog.warn("Failed to commit offsets for revoked 
partitions", e);
+            }
+        }
+
+        revokedPartitions.clear();
+    }
+
+    @Override
+    public void clearRevokedPartitions() {
+        // Remove the uncommitted offsets for revoked partitions without 
committing
+        for (final TopicPartition partition : revokedPartitions) {
+            uncommittedOffsets.remove(partition);
+        }
+        revokedPartitions.clear();
+    }
+
     private Map<TopicPartition, OffsetAndMetadata> getOffsets(final 
PollingSummary pollingSummary) {
         final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
 
@@ -200,8 +269,9 @@ public class Kafka3ConsumerService implements 
KafkaConsumerService, Closeable, C
     private static class RecordIterable implements Iterable<ByteRecord> {
         private final Iterator<ByteRecord> records;
 
-        private RecordIterable(final Iterable<ConsumerRecord<byte[], byte[]>> 
consumerRecords) {
-            this.records = new RecordIterator(consumerRecords);
+        private RecordIterable(final Iterable<ConsumerRecord<byte[], byte[]>> 
consumerRecords,
+                               final Map<TopicPartition, Long> 
uncommittedOffsets) {
+            this.records = new RecordIterator(consumerRecords, 
uncommittedOffsets);
         }
 
         @Override
@@ -212,9 +282,13 @@ public class Kafka3ConsumerService implements 
KafkaConsumerService, Closeable, C
 
     private static class RecordIterator implements Iterator<ByteRecord> {
         private final Iterator<ConsumerRecord<byte[], byte[]>> consumerRecords;
+        private final Map<TopicPartition, Long> uncommittedOffsets;
+        private TopicPartition currentTopicPartition;
 
-        private RecordIterator(final Iterable<ConsumerRecord<byte[], byte[]>> 
records) {
+        private RecordIterator(final Iterable<ConsumerRecord<byte[], byte[]>> 
records,
+                               final Map<TopicPartition, Long> 
uncommittedOffsets) {
             this.consumerRecords = records.iterator();
+            this.uncommittedOffsets = uncommittedOffsets;
         }
 
         @Override
@@ -225,6 +299,12 @@ public class Kafka3ConsumerService implements 
KafkaConsumerService, Closeable, C
         @Override
         public ByteRecord next() {
             final ConsumerRecord<byte[], byte[]> consumerRecord = 
consumerRecords.next();
+
+            // Track the offset for potential commit during rebalance
+            // Store offset + 1 because Kafka commits the next offset to 
consume
+            final TopicPartition topicPartition = 
getTopicPartition(consumerRecord);
+            uncommittedOffsets.merge(topicPartition, consumerRecord.offset() + 
1, Math::max);
+
             final List<RecordHeader> recordHeaders = new ArrayList<>();
             consumerRecord.headers().forEach(header -> {
                 final RecordHeader recordHeader = new 
RecordHeader(header.key(), header.value());
@@ -248,5 +328,14 @@ public class Kafka3ConsumerService implements 
KafkaConsumerService, Closeable, C
                     1
             );
         }
+
+        private TopicPartition getTopicPartition(final ConsumerRecord<byte[], 
byte[]> consumerRecord) {
+            if (currentTopicPartition == null
+                    || 
!currentTopicPartition.topic().equals(consumerRecord.topic())
+                    || currentTopicPartition.partition() != 
consumerRecord.partition()) {
+                currentTopicPartition = new 
TopicPartition(consumerRecord.topic(), consumerRecord.partition());
+            }
+            return currentTopicPartition;
+        }
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerServiceTest.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerServiceTest.java
new file mode 100644
index 0000000000..4463c499df
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerServiceTest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.consumer;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class Kafka3ConsumerServiceTest {
+
+    private static final String TOPIC = "test-topic";
+    private static final String GROUP_ID = "test-group";
+    private static final int PARTITION_0 = 0;
+    private static final int PARTITION_1 = 1;
+
+    @Mock
+    private Consumer<byte[], byte[]> consumer;
+
+    @Mock
+    private ComponentLog componentLog;
+
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
offsetsCaptor;
+
+    private Kafka3ConsumerService consumerService;
+
+    @BeforeEach
+    void setUp() {
+        final Subscription subscription = new Subscription(GROUP_ID, 
Collections.singletonList(TOPIC), AutoOffsetReset.EARLIEST);
+        consumerService = new Kafka3ConsumerService(componentLog, consumer, 
subscription);
+    }
+
+    @Test
+    void testOnPartitionsRevokedStoresPartitionsForLaterCommit() {
+        final TopicPartition partition0 = new TopicPartition(TOPIC, 
PARTITION_0);
+        final TopicPartition partition1 = new TopicPartition(TOPIC, 
PARTITION_1);
+
+        final ConsumerRecord<byte[], byte[]> record0 = createRecord(TOPIC, 
PARTITION_0, 5L);
+        final ConsumerRecord<byte[], byte[]> record1 = createRecord(TOPIC, 
PARTITION_1, 10L);
+
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsMap = new HashMap<>();
+        recordsMap.put(partition0, List.of(record0));
+        recordsMap.put(partition1, List.of(record1));
+        final ConsumerRecords<byte[], byte[]> consumerRecords = 
createConsumerRecords(recordsMap);
+
+        when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+        final Iterable<ByteRecord> polledRecords = 
consumerService.poll(Duration.ofMillis(100));
+        for (ByteRecord ignored : polledRecords) {
+        }
+
+        assertFalse(consumerService.hasRevokedPartitions());
+
+        final Collection<TopicPartition> revokedPartitions = 
List.of(partition0, partition1);
+        consumerService.onPartitionsRevoked(revokedPartitions);
+
+        assertTrue(consumerService.hasRevokedPartitions());
+        assertEquals(2, consumerService.getRevokedPartitions().size());
+        verify(consumer, never()).commitSync(anyMap());
+    }
+
+    @Test
+    void testCommitOffsetsForRevokedPartitions() {
+        final TopicPartition partition0 = new TopicPartition(TOPIC, 
PARTITION_0);
+        final TopicPartition partition1 = new TopicPartition(TOPIC, 
PARTITION_1);
+
+        final ConsumerRecord<byte[], byte[]> record0 = createRecord(TOPIC, 
PARTITION_0, 5L);
+        final ConsumerRecord<byte[], byte[]> record1 = createRecord(TOPIC, 
PARTITION_1, 10L);
+
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsMap = new HashMap<>();
+        recordsMap.put(partition0, List.of(record0));
+        recordsMap.put(partition1, List.of(record1));
+        final ConsumerRecords<byte[], byte[]> consumerRecords = 
createConsumerRecords(recordsMap);
+
+        when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+        final Iterable<ByteRecord> polledRecords = 
consumerService.poll(Duration.ofMillis(100));
+        for (ByteRecord ignored : polledRecords) {
+        }
+
+        consumerService.onPartitionsRevoked(List.of(partition0, partition1));
+        consumerService.commitOffsetsForRevokedPartitions();
+
+        verify(consumer).commitSync(offsetsCaptor.capture());
+        final Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
offsetsCaptor.getValue();
+
+        assertEquals(2, committedOffsets.size());
+        assertEquals(6L, committedOffsets.get(partition0).offset());
+        assertEquals(11L, committedOffsets.get(partition1).offset());
+        assertFalse(consumerService.hasRevokedPartitions());
+    }
+
+    @Test
+    void testOnPartitionsRevokedWithNoUncommittedOffsets() {
+        final TopicPartition partition0 = new TopicPartition(TOPIC, 
PARTITION_0);
+        consumerService.onPartitionsRevoked(List.of(partition0));
+        assertFalse(consumerService.hasRevokedPartitions());
+        verify(consumer, never()).commitSync(anyMap());
+    }
+
+    @Test
+    void testOnPartitionsRevokedOnlyTracksPartitionsWithUncommittedOffsets() {
+        final TopicPartition partition0 = new TopicPartition(TOPIC, 
PARTITION_0);
+        final TopicPartition partition1 = new TopicPartition(TOPIC, 
PARTITION_1);
+
+        final ConsumerRecord<byte[], byte[]> record0 = createRecord(TOPIC, 
PARTITION_0, 5L);
+        final ConsumerRecord<byte[], byte[]> record1 = createRecord(TOPIC, 
PARTITION_1, 10L);
+
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsMap = new HashMap<>();
+        recordsMap.put(partition0, List.of(record0));
+        recordsMap.put(partition1, List.of(record1));
+        final ConsumerRecords<byte[], byte[]> consumerRecords = 
createConsumerRecords(recordsMap);
+
+        when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+        final Iterable<ByteRecord> polledRecords = 
consumerService.poll(Duration.ofMillis(100));
+        for (ByteRecord ignored : polledRecords) {
+        }
+
+        consumerService.onPartitionsRevoked(List.of(partition0));
+        consumerService.commitOffsetsForRevokedPartitions();
+
+        verify(consumer).commitSync(offsetsCaptor.capture());
+        final Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
offsetsCaptor.getValue();
+
+        assertEquals(1, committedOffsets.size());
+        assertEquals(6L, committedOffsets.get(partition0).offset());
+        assertFalse(committedOffsets.containsKey(partition1));
+    }
+
+    @Test
+    void testCommitOffsetsForRevokedPartitionsTracksMaxOffset() {
+        final TopicPartition partition0 = new TopicPartition(TOPIC, 
PARTITION_0);
+
+        final ConsumerRecord<byte[], byte[]> record1 = createRecord(TOPIC, 
PARTITION_0, 5L);
+        final ConsumerRecord<byte[], byte[]> record2 = createRecord(TOPIC, 
PARTITION_0, 7L);
+        final ConsumerRecord<byte[], byte[]> record3 = createRecord(TOPIC, 
PARTITION_0, 6L);
+
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsMap = new HashMap<>();
+        recordsMap.put(partition0, List.of(record1, record2, record3));
+        final ConsumerRecords<byte[], byte[]> consumerRecords = 
createConsumerRecords(recordsMap);
+
+        when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+        final Iterable<ByteRecord> polledRecords = 
consumerService.poll(Duration.ofMillis(100));
+        for (ByteRecord ignored : polledRecords) {
+        }
+
+        consumerService.onPartitionsRevoked(List.of(partition0));
+        consumerService.commitOffsetsForRevokedPartitions();
+
+        verify(consumer).commitSync(offsetsCaptor.capture());
+        final Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
offsetsCaptor.getValue();
+
+        assertEquals(1, committedOffsets.size());
+        assertEquals(8L, committedOffsets.get(partition0).offset());
+    }
+
+    @Test
+    void testRollbackClearsRevokedPartitions() {
+        final TopicPartition partition0 = new TopicPartition(TOPIC, 
PARTITION_0);
+
+        final ConsumerRecord<byte[], byte[]> record0 = createRecord(TOPIC, 
PARTITION_0, 5L);
+
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsMap = new HashMap<>();
+        recordsMap.put(partition0, List.of(record0));
+        final ConsumerRecords<byte[], byte[]> consumerRecords = 
createConsumerRecords(recordsMap);
+
+        when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+        
when(consumer.assignment()).thenReturn(Collections.singleton(partition0));
+        
when(consumer.committed(any())).thenReturn(Collections.singletonMap(partition0, 
new OffsetAndMetadata(0L)));
+
+        final Iterable<ByteRecord> polledRecords = 
consumerService.poll(Duration.ofMillis(100));
+        for (ByteRecord ignored : polledRecords) {
+        }
+
+        consumerService.onPartitionsRevoked(List.of(partition0));
+        assertTrue(consumerService.hasRevokedPartitions());
+
+        consumerService.rollback();
+        assertFalse(consumerService.hasRevokedPartitions());
+
+        consumerService.commitOffsetsForRevokedPartitions();
+        verify(consumer, never()).commitSync(anyMap());
+    }
+
+    @Test
+    void testClearRevokedPartitionsWithoutCommitting() {
+        final TopicPartition partition0 = new TopicPartition(TOPIC, 
PARTITION_0);
+
+        final ConsumerRecord<byte[], byte[]> record0 = createRecord(TOPIC, 
PARTITION_0, 5L);
+
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsMap = new HashMap<>();
+        recordsMap.put(partition0, List.of(record0));
+        final ConsumerRecords<byte[], byte[]> consumerRecords = 
createConsumerRecords(recordsMap);
+
+        when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+        final Iterable<ByteRecord> polledRecords = 
consumerService.poll(Duration.ofMillis(100));
+        for (ByteRecord ignored : polledRecords) {
+        }
+
+        consumerService.onPartitionsRevoked(List.of(partition0));
+        assertTrue(consumerService.hasRevokedPartitions());
+
+        consumerService.clearRevokedPartitions();
+        assertFalse(consumerService.hasRevokedPartitions());
+
+        verify(consumer, never()).commitSync(anyMap());
+    }
+
+    private ConsumerRecord<byte[], byte[]> createRecord(final String topic, 
final int partition, final long offset) {
+        return new ConsumerRecord<>(
+                topic,
+                partition,
+                offset,
+                System.currentTimeMillis(),
+                TimestampType.CREATE_TIME,
+                0,
+                0,
+                null,
+                "test-value".getBytes(),
+                new RecordHeaders(),
+                Optional.empty()
+        );
+    }
+
+    private ConsumerRecords<byte[], byte[]> createConsumerRecords(
+            final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsMap) {
+        // Calculate next offsets from the records (max offset + 1 for each 
partition)
+        final Map<TopicPartition, OffsetAndMetadata> nextOffsets = new 
HashMap<>();
+        for (Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
entry : recordsMap.entrySet()) {
+            long maxOffset = entry.getValue().stream()
+                    .mapToLong(ConsumerRecord::offset)
+                    .max()
+                    .orElse(-1L);
+            nextOffsets.put(entry.getKey(), new OffsetAndMetadata(maxOffset + 
1));
+        }
+        return new ConsumerRecords<>(recordsMap, nextOffsets);
+    }
+}

Reply via email to