This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ede26f48d7 NIFI-15464 Commit pending offsets for revoked partitions in
ConsumeKafka (#10769)
ede26f48d7 is described below
commit ede26f48d7ee3d6b54b8bc4da5cda205abcc9917
Author: Pierre Villard <[email protected]>
AuthorDate: Sat Feb 7 20:12:07 2026 +0100
NIFI-15464 Commit pending offsets for revoked partitions in ConsumeKafka
(#10769)
Signed-off-by: David Handermann <[email protected]>
---
.../kafka/processors/ConsumeKafkaRebalanceIT.java | 360 +++++++++++++++++++++
.../apache/nifi/kafka/processors/ConsumeKafka.java | 33 +-
.../service/api/consumer/KafkaConsumerService.java | 39 +++
.../service/consumer/Kafka3ConsumerService.java | 109 ++++++-
.../consumer/Kafka3ConsumerServiceTest.java | 287 ++++++++++++++++
5 files changed, 817 insertions(+), 11 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java
new file mode 100644
index 0000000000..d4a8103abe
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
+import org.apache.nifi.kafka.service.consumer.Subscription;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Integration tests for verifying that ConsumeKafka correctly handles
consumer group rebalances
+ * without causing duplicate message processing.
+ */
+class ConsumeKafkaRebalanceIT extends AbstractConsumeKafkaIT {
+
+ private static final int NUM_PARTITIONS = 3;
+ private static final int MESSAGES_PER_PARTITION = 20;
+
+ /**
+ * Tests that when onPartitionsRevoked is called (simulating rebalance),
the consumer
+ * correctly commits offsets, and a subsequent consumer in the same group
doesn't
+ * re-consume the same messages (no duplicates).
+ *
+ * This test:
+ * 1. Produces messages to a multi-partition topic
+ * 2. Consumer 1 polls and processes messages
+ * 3. Simulates rebalance by calling onPartitionsRevoked on Consumer 1
+ * 4. Consumer 2 joins and continues consuming from committed offsets
+ * 5. Verifies no duplicate messages were consumed
+ */
+ @Test
+ @Timeout(value = 60, unit = TimeUnit.SECONDS)
+ void testRebalanceDoesNotCauseDuplicates() throws Exception {
+ final String topic = "rebalance-test-" + UUID.randomUUID();
+ final String groupId = "rebalance-group-" + UUID.randomUUID();
+ final int totalMessages = NUM_PARTITIONS * MESSAGES_PER_PARTITION;
+
+ createTopic(topic, NUM_PARTITIONS);
+ produceMessagesToTopic(topic, NUM_PARTITIONS, MESSAGES_PER_PARTITION);
+
+ final Set<String> consumedMessages = new HashSet<>();
+ final AtomicInteger duplicateCount = new AtomicInteger(0);
+ final ComponentLog mockLog = mock(ComponentLog.class);
+
+ final Properties props1 = getConsumerProperties(groupId);
+ try (KafkaConsumer<byte[], byte[]> kafkaConsumer1 = new
KafkaConsumer<>(props1)) {
+ final Subscription subscription = new Subscription(groupId,
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+ final Kafka3ConsumerService service1 = new
Kafka3ConsumerService(mockLog, kafkaConsumer1, subscription);
+
+ int consumer1Count = 0;
+ int maxAttempts = 20;
+ while (consumer1Count < totalMessages / 2 && maxAttempts-- > 0) {
+ for (ByteRecord record : service1.poll(Duration.ofSeconds(2)))
{
+ final String messageId = record.getTopic() + "-" +
record.getPartition() + "-" + record.getOffset();
+ if (!consumedMessages.add(messageId)) {
+ duplicateCount.incrementAndGet();
+ }
+ consumer1Count++;
+ }
+ }
+
+ final Set<TopicPartition> assignment = kafkaConsumer1.assignment();
+ service1.onPartitionsRevoked(assignment);
+ // Simulate processor committing offsets after successful session
commit
+ service1.commitOffsetsForRevokedPartitions();
+ service1.close();
+ }
+
+ final Properties props2 = getConsumerProperties(groupId);
+ try (KafkaConsumer<byte[], byte[]> kafkaConsumer2 = new
KafkaConsumer<>(props2)) {
+ final Subscription subscription = new Subscription(groupId,
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+ final Kafka3ConsumerService service2 = new
Kafka3ConsumerService(mockLog, kafkaConsumer2, subscription);
+
+ int emptyPolls = 0;
+ while (emptyPolls < 5 && consumedMessages.size() < totalMessages) {
+ boolean hasRecords = false;
+ for (ByteRecord record : service2.poll(Duration.ofSeconds(2)))
{
+ hasRecords = true;
+ final String messageId = record.getTopic() + "-" +
record.getPartition() + "-" + record.getOffset();
+ if (!consumedMessages.add(messageId)) {
+ duplicateCount.incrementAndGet();
+ }
+ }
+ if (!hasRecords) {
+ emptyPolls++;
+ } else {
+ emptyPolls = 0;
+ }
+ }
+
+ service2.close();
+ }
+
+ assertEquals(0, duplicateCount.get(),
+ "Expected no duplicate messages but found " +
duplicateCount.get());
+ assertEquals(totalMessages, consumedMessages.size(),
+ "Expected to consume " + totalMessages + " unique messages but
got " + consumedMessages.size());
+ }
+
+ /**
+ * Tests that offsets can be committed after rebalance when processor
calls commitOffsetsForRevokedPartitions.
+ *
+ * This test:
+ * 1. Creates a consumer and polls messages
+ * 2. Manually invokes onPartitionsRevoked (simulating what Kafka does
during rebalance)
+ * 3. Calls commitOffsetsForRevokedPartitions (simulating processor
committing after session commit)
+ * 4. Verifies that offsets were committed to Kafka
+ */
+ @Test
+ @Timeout(value = 60, unit = TimeUnit.SECONDS)
+ void testOffsetsCommittedDuringRebalance() throws Exception {
+ final String topic = "rebalance-offset-test-" + UUID.randomUUID();
+ final String groupId = "rebalance-offset-group-" + UUID.randomUUID();
+ final int messagesPerPartition = 10;
+
+ createTopic(topic, NUM_PARTITIONS);
+ produceMessagesToTopic(topic, NUM_PARTITIONS, messagesPerPartition);
+
+ final ComponentLog mockLog = mock(ComponentLog.class);
+ final Properties props = getConsumerProperties(groupId);
+
+ try (KafkaConsumer<byte[], byte[]> kafkaConsumer = new
KafkaConsumer<>(props)) {
+ final Subscription subscription = new Subscription(groupId,
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+ final Kafka3ConsumerService service = new
Kafka3ConsumerService(mockLog, kafkaConsumer, subscription);
+
+ int polledCount = 0;
+ int maxAttempts = 20;
+ while (polledCount < 15 && maxAttempts-- > 0) {
+ for (ByteRecord ignored : service.poll(Duration.ofSeconds(2)))
{
+ polledCount++;
+ }
+ }
+
+ assertTrue(polledCount > 0, "Should have polled at least some
messages");
+
+ final Set<TopicPartition> assignment = kafkaConsumer.assignment();
+ assertFalse(assignment.isEmpty(), "Consumer should have partition
assignments");
+
+ service.onPartitionsRevoked(assignment);
+ // Simulate processor committing offsets after successful session
commit
+ service.commitOffsetsForRevokedPartitions();
+ service.close();
+ }
+
+ try (KafkaConsumer<byte[], byte[]> verifyConsumer = new
KafkaConsumer<>(getConsumerProperties(groupId))) {
+ final Set<TopicPartition> partitions = new HashSet<>();
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
+ partitions.add(new TopicPartition(topic, i));
+ }
+
+ final Map<TopicPartition, OffsetAndMetadata> committedOffsets =
verifyConsumer.committed(partitions);
+
+ long totalCommitted = committedOffsets.values().stream()
+ .filter(o -> o != null)
+ .mapToLong(OffsetAndMetadata::offset)
+ .sum();
+
+ assertTrue(totalCommitted > 0,
+ "Expected offsets to be committed after
commitOffsetsForRevokedPartitions, but total committed offset was " +
totalCommitted);
+ }
+ }
+
+ /**
+ * Tests that records are NOT lost when a rebalance occurs before
processing is complete.
+ *
+ * This test simulates the scenario where:
+ * 1. Consumer polls and iterates through records (tracking offsets
internally)
+ * 2. Rebalance occurs (onPartitionsRevoked called) BEFORE the processor
commits its session
+ * 3. Consumer "fails" (simulating crash or processing failure) without
committing offsets
+ * 4. New consumer joins with the same group
+ * 5. The new consumer receives the same records since they were never
successfully processed
+ */
+ @Test
+ @Timeout(value = 60, unit = TimeUnit.SECONDS)
+ void testNoDataLossWhenRebalanceOccursBeforeProcessingComplete() throws
Exception {
+ final String topic = "dataloss-test-" + UUID.randomUUID();
+ final String groupId = "dataloss-group-" + UUID.randomUUID();
+ final int messagesPerPartition = 10;
+ final int totalMessages = NUM_PARTITIONS * messagesPerPartition;
+
+ createTopic(topic, NUM_PARTITIONS);
+ produceMessagesToTopic(topic, NUM_PARTITIONS, messagesPerPartition);
+
+ final ComponentLog mockLog = mock(ComponentLog.class);
+ int recordsPolledByFirstConsumer = 0;
+
+ // Consumer 1: Poll and iterate records, then rebalance occurs, but
processing "fails"
+ final Properties props1 = getConsumerProperties(groupId);
+ try (KafkaConsumer<byte[], byte[]> kafkaConsumer1 = new
KafkaConsumer<>(props1)) {
+ final Subscription subscription = new Subscription(groupId,
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+ final Kafka3ConsumerService service1 = new
Kafka3ConsumerService(mockLog, kafkaConsumer1, subscription);
+
+ // Poll and iterate through records - this tracks offsets
internally
+ int maxAttempts = 20;
+ while (recordsPolledByFirstConsumer < totalMessages &&
maxAttempts-- > 0) {
+ for (ByteRecord ignored :
service1.poll(Duration.ofSeconds(2))) {
+ recordsPolledByFirstConsumer++;
+ }
+ }
+
+ assertTrue(recordsPolledByFirstConsumer > 0, "First consumer
should have polled some records");
+
+ // Simulate rebalance occurring before processor commits its
session
+ final Set<TopicPartition> assignment = kafkaConsumer1.assignment();
+ assertFalse(assignment.isEmpty(), "Consumer should have partition
assignments");
+ service1.onPartitionsRevoked(assignment);
+
+ // DO NOT call any "commit" or "process" method - simulating that
the processor
+ // never completed processing (e.g., session commit failed,
process crashed, etc.)
+
+ service1.close();
+ }
+
+ // Consumer 2: Should receive the SAME records because processing was
never completed
+ int recordsPolledBySecondConsumer = 0;
+ final Properties props2 = getConsumerProperties(groupId);
+ try (KafkaConsumer<byte[], byte[]> kafkaConsumer2 = new
KafkaConsumer<>(props2)) {
+ final Subscription subscription = new Subscription(groupId,
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+ final Kafka3ConsumerService service2 = new
Kafka3ConsumerService(mockLog, kafkaConsumer2, subscription);
+
+ // Poll for records - if no data loss, we should get the same
records again
+ int emptyPolls = 0;
+ while (emptyPolls < 5) {
+ boolean hasRecords = false;
+ for (ByteRecord ignored :
service2.poll(Duration.ofSeconds(2))) {
+ hasRecords = true;
+ recordsPolledBySecondConsumer++;
+ }
+ if (!hasRecords) {
+ emptyPolls++;
+ } else {
+ emptyPolls = 0;
+ }
+ }
+
+ service2.close();
+ }
+
+ // Records should NOT be lost - the second consumer should receive
+ // at least the records that were polled by the first consumer but
never processed
+ assertTrue(recordsPolledBySecondConsumer >=
recordsPolledByFirstConsumer,
+ "Data loss detected! First consumer polled " +
recordsPolledByFirstConsumer +
+ " records but second consumer only received " +
recordsPolledBySecondConsumer +
+ " records. Expected second consumer to receive at least " +
recordsPolledByFirstConsumer +
+ " records since processing was never completed.");
+ }
+
+ /**
+ * Produces messages to a specific topic with a given number of partitions.
+ */
+ private void produceMessagesToTopic(final String topic, final int
numPartitions, final int messagesPerPartition) throws Exception {
+ final Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+
+ try (KafkaProducer<String, String> producer = new
KafkaProducer<>(producerProps)) {
+ for (int partition = 0; partition < numPartitions; partition++) {
+ for (int i = 0; i < messagesPerPartition; i++) {
+ final String key = "key-" + partition + "-" + i;
+ final String value = "value-" + partition + "-" + i;
+ producer.send(new ProducerRecord<>(topic, partition, key,
value)).get();
+ }
+ }
+ }
+ }
+
+ private void createTopic(final String topic, final int numPartitions)
throws Exception {
+ final Properties adminProps = new Properties();
+ adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
+
+ try (Admin admin = Admin.create(adminProps)) {
+ final NewTopic newTopic = new NewTopic(topic, numPartitions,
(short) 1);
+
admin.createTopics(Collections.singletonList(newTopic)).all().get(30,
TimeUnit.SECONDS);
+ waitForTopicReady(admin, topic, numPartitions);
+ }
+ }
+
+ private void waitForTopicReady(final Admin admin, final String topic,
final int expectedPartitions) throws Exception {
+ final long startTime = System.currentTimeMillis();
+ final long timeoutMillis = 30000;
+
+ while (System.currentTimeMillis() - startTime < timeoutMillis) {
+ try {
+ final Map<String, TopicDescription> descriptions =
admin.describeTopics(Collections.singletonList(topic))
+ .allTopicNames()
+ .get(10, TimeUnit.SECONDS);
+ final TopicDescription description = descriptions.get(topic);
+ if (description != null && description.partitions().size() ==
expectedPartitions) {
+ return;
+ }
+ } catch (ExecutionException ignored) {
+ // Topic not ready yet, continue polling
+ }
+ Thread.sleep(100);
+ }
+ throw new RuntimeException("Topic " + topic + " not ready after " +
timeoutMillis + "ms");
+ }
+
+ private Properties getConsumerProperties(final String groupId) {
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
+ // Use shorter session timeout to speed up rebalance detection
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
+ props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
+ return props;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
index 336f6e77d9..c1658822cb 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
@@ -435,12 +435,23 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
final Iterator<ByteRecord> consumerRecords =
consumerService.poll(maxWaitDuration).iterator();
if (!consumerRecords.hasNext()) {
getLogger().trace("No Kafka Records consumed: {}",
pollingContext);
+ // Check if a rebalance occurred during poll - if so,
break to commit what we have
+ if (consumerService.hasRevokedPartitions()) {
+ getLogger().debug("Rebalance detected with revoked
partitions, breaking to commit session");
+ break;
+ }
continue;
}
recordsReceived = true;
processConsumerRecords(context, session, offsetTracker,
consumerRecords);
+ // Check if a rebalance occurred during poll - if so, break to
commit what we have
+ if (consumerService.hasRevokedPartitions()) {
+ getLogger().debug("Rebalance detected with revoked
partitions, breaking to commit session");
+ break;
+ }
+
if (maxUncommittedSizeConfigured) {
// Stop consuming before reaching Max Uncommitted Time
when exceeding Max Uncommitted Size
final long totalRecordSize =
offsetTracker.getTotalRecordSize();
@@ -460,12 +471,24 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
}
}
- if (!recordsReceived) {
+ if (!recordsReceived && !consumerService.hasRevokedPartitions()) {
getLogger().trace("No Kafka Records consumed, re-queuing
consumer");
consumerServices.offer(consumerService);
return;
}
+ // If no records received but we have revoked partitions, we still
need to commit their offsets
+ if (!recordsReceived && consumerService.hasRevokedPartitions()) {
+ getLogger().debug("No records received but rebalance occurred,
committing offsets for revoked partitions");
+ try {
+ consumerService.commitOffsetsForRevokedPartitions();
+ } catch (final Exception e) {
+ getLogger().warn("Failed to commit offsets for revoked
partitions", e);
+ }
+ consumerServices.offer(consumerService);
+ return;
+ }
+
session.commitAsync(
() -> commitOffsets(consumerService, offsetTracker,
pollingContext, session),
throwable -> {
@@ -485,6 +508,12 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
});
}
+ // After successful session commit, also commit offsets for any
partitions that were revoked during rebalance
+ if (consumerService.hasRevokedPartitions()) {
+ getLogger().debug("Committing offsets for partitions revoked
during rebalance");
+ consumerService.commitOffsetsForRevokedPartitions();
+ }
+
consumerServices.offer(consumerService);
getLogger().debug("Committed offsets for Kafka Consumer Service");
} catch (final Exception e) {
@@ -496,6 +525,8 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
private void rollback(final KafkaConsumerService consumerService, final
OffsetTracker offsetTracker, final ProcessSession session) {
if (!consumerService.isClosed()) {
try {
+ // Clear any pending revoked partitions since we're rolling
back
+ consumerService.clearRevokedPartitions();
consumerService.rollback();
consumerServices.offer(consumerService);
getLogger().debug("Rolled back offsets for Kafka Consumer
Service");
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java
index 1a0e36affc..310f31ad26 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java
@@ -22,6 +22,8 @@ import org.apache.nifi.kafka.service.api.record.ByteRecord;
import java.io.Closeable;
import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
@@ -67,4 +69,41 @@ public interface KafkaConsumerService extends Closeable {
* @return OptionalLong containing the current lag or empty when not
available
*/
OptionalLong currentLag(TopicPartitionSummary topicPartitionSummary);
+
+ /**
+ * Check if a Kafka consumer group rebalance has occurred and partitions
were revoked.
+ * When partitions are revoked, the processor should commit its session
before calling
+ * {@link #commitOffsetsForRevokedPartitions()} to avoid data loss.
+ *
+ * @return <code>true</code> if partitions have been revoked and are
pending commit; <code>false</code> otherwise
+ */
+ default boolean hasRevokedPartitions() {
+ return false;
+ }
+
+ /**
+ * Get the collection of partitions that were revoked during a rebalance
and are pending commit.
+ * This does not clear the revoked partitions; call {@link
#commitOffsetsForRevokedPartitions()}
+ * or {@link #clearRevokedPartitions()} to clear them.
+ *
+ * @return Collection of revoked partition states, or empty collection if
none
+ */
+ default Collection<PartitionState> getRevokedPartitions() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Commit offsets for partitions that were revoked during a rebalance.
+ * This method should be called by the processor AFTER successfully
committing its session
+ * to ensure no data loss occurs. After calling this method, the revoked
partitions are cleared.
+ */
+ default void commitOffsetsForRevokedPartitions() {
+ }
+
+ /**
+ * Clear the revoked partitions without committing their offsets.
+ * This should be called when the processor decides not to commit (e.g.,
on rollback).
+ */
+ default void clearRevokedPartitions() {
+ }
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
index 250594c60c..0af2d79ede 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
@@ -37,7 +37,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -45,9 +44,10 @@ import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.regex.Pattern;
-
-import static java.util.stream.Collectors.toList;
+import java.util.stream.Collectors;
/**
* Kafka 3 Consumer Service implementation with Object Pooling for subscribed
Kafka Consumers
@@ -57,6 +57,8 @@ public class Kafka3ConsumerService implements
KafkaConsumerService, Closeable, C
private final ComponentLog componentLog;
private final Consumer<byte[], byte[]> consumer;
private final Subscription subscription;
+ private final Map<TopicPartition, Long> uncommittedOffsets = new
ConcurrentHashMap<>();
+ private final Set<TopicPartition> revokedPartitions = new
CopyOnWriteArraySet<>();
private volatile boolean closed = false;
public Kafka3ConsumerService(final ComponentLog componentLog, final
Consumer<byte[], byte[]> consumer, final Subscription subscription) {
@@ -82,7 +84,19 @@ public class Kafka3ConsumerService implements
KafkaConsumerService, Closeable, C
@Override
public void onPartitionsRevoked(final Collection<TopicPartition>
partitions) {
componentLog.info("Kafka revoked the following Partitions from this
consumer: {}", partitions);
- rollback(new HashSet<>(partitions));
+
+ // Store revoked partitions for the processor to handle after
committing its session.
+ // We do NOT commit offsets here to avoid data loss - the processor
must commit its
+ // session first, then call commitOffsetsForRevokedPartitions().
+ for (final TopicPartition partition : partitions) {
+ if (uncommittedOffsets.containsKey(partition)) {
+ revokedPartitions.add(partition);
+ }
+ }
+
+ if (!revokedPartitions.isEmpty()) {
+ componentLog.info("Partitions revoked with uncommitted offsets,
pending processor commit: {}", revokedPartitions);
+ }
}
@Override
@@ -91,7 +105,10 @@ public class Kafka3ConsumerService implements
KafkaConsumerService, Closeable, C
final long started = System.currentTimeMillis();
consumer.commitSync(offsets);
- final long elapsed = started - System.currentTimeMillis();
+ final long elapsed = System.currentTimeMillis() - started;
+
+ // Clear tracked offsets for committed partitions
+ offsets.keySet().forEach(uncommittedOffsets::remove);
componentLog.debug("Committed Records in [{} ms] for {}", elapsed,
pollingSummary);
}
@@ -106,6 +123,11 @@ public class Kafka3ConsumerService implements
KafkaConsumerService, Closeable, C
return;
}
+ // Clear tracked offsets for rolled back partitions
+ partitions.forEach(uncommittedOffsets::remove);
+ // Clear any revoked partitions that are being rolled back
+ revokedPartitions.removeAll(partitions);
+
try {
final Map<TopicPartition, OffsetAndMetadata> metadataMap =
consumer.committed(partitions);
for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry :
metadataMap.entrySet()) {
@@ -137,7 +159,7 @@ public class Kafka3ConsumerService implements
KafkaConsumerService, Closeable, C
return List.of();
}
- return new RecordIterable(consumerRecords);
+ return new RecordIterable(consumerRecords, uncommittedOffsets);
}
@Override
@@ -153,7 +175,7 @@ public class Kafka3ConsumerService implements
KafkaConsumerService, Closeable, C
.map(partitionInfo -> new PartitionState(
partitionInfo.topic(),
partitionInfo.partition()))
- .collect(toList());
+ .collect(Collectors.toList());
} else {
partitionStates = Collections.emptyList();
}
@@ -179,6 +201,53 @@ public class Kafka3ConsumerService implements
KafkaConsumerService, Closeable, C
consumer.close();
}
+ @Override
+ public boolean hasRevokedPartitions() {
+ return !revokedPartitions.isEmpty();
+ }
+
+ @Override
+ public Collection<PartitionState> getRevokedPartitions() {
+ return revokedPartitions.stream()
+ .map(tp -> new PartitionState(tp.topic(), tp.partition()))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void commitOffsetsForRevokedPartitions() {
+ if (revokedPartitions.isEmpty()) {
+ return;
+ }
+
+ final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new
HashMap<>();
+ for (final TopicPartition partition : revokedPartitions) {
+ final Long offset = uncommittedOffsets.remove(partition);
+ if (offset != null) {
+ offsetsToCommit.put(partition, new OffsetAndMetadata(offset));
+ }
+ }
+
+ if (!offsetsToCommit.isEmpty()) {
+ try {
+ consumer.commitSync(offsetsToCommit);
+ componentLog.info("Committed offsets for revoked partitions
after processor commit: {}", offsetsToCommit);
+ } catch (final Exception e) {
+ componentLog.warn("Failed to commit offsets for revoked
partitions", e);
+ }
+ }
+
+ revokedPartitions.clear();
+ }
+
+ @Override
+ public void clearRevokedPartitions() {
+ // Remove the uncommitted offsets for revoked partitions without
committing
+ for (final TopicPartition partition : revokedPartitions) {
+ uncommittedOffsets.remove(partition);
+ }
+ revokedPartitions.clear();
+ }
+
private Map<TopicPartition, OffsetAndMetadata> getOffsets(final
PollingSummary pollingSummary) {
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
@@ -200,8 +269,9 @@ public class Kafka3ConsumerService implements
KafkaConsumerService, Closeable, C
private static class RecordIterable implements Iterable<ByteRecord> {
private final Iterator<ByteRecord> records;
- private RecordIterable(final Iterable<ConsumerRecord<byte[], byte[]>>
consumerRecords) {
- this.records = new RecordIterator(consumerRecords);
+ private RecordIterable(final Iterable<ConsumerRecord<byte[], byte[]>>
consumerRecords,
+ final Map<TopicPartition, Long>
uncommittedOffsets) {
+ this.records = new RecordIterator(consumerRecords,
uncommittedOffsets);
}
@Override
@@ -212,9 +282,13 @@ public class Kafka3ConsumerService implements
KafkaConsumerService, Closeable, C
private static class RecordIterator implements Iterator<ByteRecord> {
private final Iterator<ConsumerRecord<byte[], byte[]>> consumerRecords;
+ private final Map<TopicPartition, Long> uncommittedOffsets;
+ private TopicPartition currentTopicPartition;
- private RecordIterator(final Iterable<ConsumerRecord<byte[], byte[]>>
records) {
+ private RecordIterator(final Iterable<ConsumerRecord<byte[], byte[]>>
records,
+ final Map<TopicPartition, Long>
uncommittedOffsets) {
this.consumerRecords = records.iterator();
+ this.uncommittedOffsets = uncommittedOffsets;
}
@Override
@@ -225,6 +299,12 @@ public class Kafka3ConsumerService implements
KafkaConsumerService, Closeable, C
@Override
public ByteRecord next() {
final ConsumerRecord<byte[], byte[]> consumerRecord =
consumerRecords.next();
+
+ // Track the offset for potential commit during rebalance
+ // Store offset + 1 because Kafka commits the next offset to
consume
+ final TopicPartition topicPartition =
getTopicPartition(consumerRecord);
+ uncommittedOffsets.merge(topicPartition, consumerRecord.offset() +
1, Math::max);
+
final List<RecordHeader> recordHeaders = new ArrayList<>();
consumerRecord.headers().forEach(header -> {
final RecordHeader recordHeader = new
RecordHeader(header.key(), header.value());
@@ -248,5 +328,14 @@ public class Kafka3ConsumerService implements
KafkaConsumerService, Closeable, C
1
);
}
+
+ private TopicPartition getTopicPartition(final ConsumerRecord<byte[],
byte[]> consumerRecord) {
+ if (currentTopicPartition == null
+ ||
!currentTopicPartition.topic().equals(consumerRecord.topic())
+ || currentTopicPartition.partition() !=
consumerRecord.partition()) {
+ currentTopicPartition = new
TopicPartition(consumerRecord.topic(), consumerRecord.partition());
+ }
+ return currentTopicPartition;
+ }
}
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerServiceTest.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerServiceTest.java
new file mode 100644
index 0000000000..4463c499df
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerServiceTest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.consumer;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class Kafka3ConsumerServiceTest {
+
+ private static final String TOPIC = "test-topic";
+ private static final String GROUP_ID = "test-group";
+ private static final int PARTITION_0 = 0;
+ private static final int PARTITION_1 = 1;
+
+ @Mock
+ private Consumer<byte[], byte[]> consumer;
+
+ @Mock
+ private ComponentLog componentLog;
+
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>>
offsetsCaptor;
+
+ private Kafka3ConsumerService consumerService;
+
+ @BeforeEach
+ void setUp() {
+ final Subscription subscription = new Subscription(GROUP_ID,
Collections.singletonList(TOPIC), AutoOffsetReset.EARLIEST);
+ consumerService = new Kafka3ConsumerService(componentLog, consumer,
subscription);
+ }
+
+ @Test
+ void testOnPartitionsRevokedStoresPartitionsForLaterCommit() {
+ final TopicPartition partition0 = new TopicPartition(TOPIC,
PARTITION_0);
+ final TopicPartition partition1 = new TopicPartition(TOPIC,
PARTITION_1);
+
+ final ConsumerRecord<byte[], byte[]> record0 = createRecord(TOPIC,
PARTITION_0, 5L);
+ final ConsumerRecord<byte[], byte[]> record1 = createRecord(TOPIC,
PARTITION_1, 10L);
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
recordsMap = new HashMap<>();
+ recordsMap.put(partition0, List.of(record0));
+ recordsMap.put(partition1, List.of(record1));
+ final ConsumerRecords<byte[], byte[]> consumerRecords =
createConsumerRecords(recordsMap);
+
+ when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+ final Iterable<ByteRecord> polledRecords =
consumerService.poll(Duration.ofMillis(100));
+ for (ByteRecord ignored : polledRecords) {
+ }
+
+ assertFalse(consumerService.hasRevokedPartitions());
+
+ final Collection<TopicPartition> revokedPartitions =
List.of(partition0, partition1);
+ consumerService.onPartitionsRevoked(revokedPartitions);
+
+ assertTrue(consumerService.hasRevokedPartitions());
+ assertEquals(2, consumerService.getRevokedPartitions().size());
+ verify(consumer, never()).commitSync(anyMap());
+ }
+
+ @Test
+ void testCommitOffsetsForRevokedPartitions() {
+ final TopicPartition partition0 = new TopicPartition(TOPIC,
PARTITION_0);
+ final TopicPartition partition1 = new TopicPartition(TOPIC,
PARTITION_1);
+
+ final ConsumerRecord<byte[], byte[]> record0 = createRecord(TOPIC,
PARTITION_0, 5L);
+ final ConsumerRecord<byte[], byte[]> record1 = createRecord(TOPIC,
PARTITION_1, 10L);
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
recordsMap = new HashMap<>();
+ recordsMap.put(partition0, List.of(record0));
+ recordsMap.put(partition1, List.of(record1));
+ final ConsumerRecords<byte[], byte[]> consumerRecords =
createConsumerRecords(recordsMap);
+
+ when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+ final Iterable<ByteRecord> polledRecords =
consumerService.poll(Duration.ofMillis(100));
+ for (ByteRecord ignored : polledRecords) {
+ }
+
+ consumerService.onPartitionsRevoked(List.of(partition0, partition1));
+ consumerService.commitOffsetsForRevokedPartitions();
+
+ verify(consumer).commitSync(offsetsCaptor.capture());
+ final Map<TopicPartition, OffsetAndMetadata> committedOffsets =
offsetsCaptor.getValue();
+
+ assertEquals(2, committedOffsets.size());
+ assertEquals(6L, committedOffsets.get(partition0).offset());
+ assertEquals(11L, committedOffsets.get(partition1).offset());
+ assertFalse(consumerService.hasRevokedPartitions());
+ }
+
+ @Test
+ void testOnPartitionsRevokedWithNoUncommittedOffsets() {
+ final TopicPartition partition0 = new TopicPartition(TOPIC,
PARTITION_0);
+ consumerService.onPartitionsRevoked(List.of(partition0));
+ assertFalse(consumerService.hasRevokedPartitions());
+ verify(consumer, never()).commitSync(anyMap());
+ }
+
+ @Test
+ void testOnPartitionsRevokedOnlyTracksPartitionsWithUncommittedOffsets() {
+ final TopicPartition partition0 = new TopicPartition(TOPIC,
PARTITION_0);
+ final TopicPartition partition1 = new TopicPartition(TOPIC,
PARTITION_1);
+
+ final ConsumerRecord<byte[], byte[]> record0 = createRecord(TOPIC,
PARTITION_0, 5L);
+ final ConsumerRecord<byte[], byte[]> record1 = createRecord(TOPIC,
PARTITION_1, 10L);
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
recordsMap = new HashMap<>();
+ recordsMap.put(partition0, List.of(record0));
+ recordsMap.put(partition1, List.of(record1));
+ final ConsumerRecords<byte[], byte[]> consumerRecords =
createConsumerRecords(recordsMap);
+
+ when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+ final Iterable<ByteRecord> polledRecords =
consumerService.poll(Duration.ofMillis(100));
+ for (ByteRecord ignored : polledRecords) {
+ }
+
+ consumerService.onPartitionsRevoked(List.of(partition0));
+ consumerService.commitOffsetsForRevokedPartitions();
+
+ verify(consumer).commitSync(offsetsCaptor.capture());
+ final Map<TopicPartition, OffsetAndMetadata> committedOffsets =
offsetsCaptor.getValue();
+
+ assertEquals(1, committedOffsets.size());
+ assertEquals(6L, committedOffsets.get(partition0).offset());
+ assertFalse(committedOffsets.containsKey(partition1));
+ }
+
+ @Test
+ void testCommitOffsetsForRevokedPartitionsTracksMaxOffset() {
+ final TopicPartition partition0 = new TopicPartition(TOPIC,
PARTITION_0);
+
+ final ConsumerRecord<byte[], byte[]> record1 = createRecord(TOPIC,
PARTITION_0, 5L);
+ final ConsumerRecord<byte[], byte[]> record2 = createRecord(TOPIC,
PARTITION_0, 7L);
+ final ConsumerRecord<byte[], byte[]> record3 = createRecord(TOPIC,
PARTITION_0, 6L);
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
recordsMap = new HashMap<>();
+ recordsMap.put(partition0, List.of(record1, record2, record3));
+ final ConsumerRecords<byte[], byte[]> consumerRecords =
createConsumerRecords(recordsMap);
+
+ when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+ final Iterable<ByteRecord> polledRecords =
consumerService.poll(Duration.ofMillis(100));
+ for (ByteRecord ignored : polledRecords) {
+ }
+
+ consumerService.onPartitionsRevoked(List.of(partition0));
+ consumerService.commitOffsetsForRevokedPartitions();
+
+ verify(consumer).commitSync(offsetsCaptor.capture());
+ final Map<TopicPartition, OffsetAndMetadata> committedOffsets =
offsetsCaptor.getValue();
+
+ assertEquals(1, committedOffsets.size());
+ assertEquals(8L, committedOffsets.get(partition0).offset());
+ }
+
+ @Test
+ void testRollbackClearsRevokedPartitions() {
+ final TopicPartition partition0 = new TopicPartition(TOPIC,
PARTITION_0);
+
+ final ConsumerRecord<byte[], byte[]> record0 = createRecord(TOPIC,
PARTITION_0, 5L);
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
recordsMap = new HashMap<>();
+ recordsMap.put(partition0, List.of(record0));
+ final ConsumerRecords<byte[], byte[]> consumerRecords =
createConsumerRecords(recordsMap);
+
+ when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
when(consumer.assignment()).thenReturn(Collections.singleton(partition0));
+
when(consumer.committed(any())).thenReturn(Collections.singletonMap(partition0,
new OffsetAndMetadata(0L)));
+
+ final Iterable<ByteRecord> polledRecords =
consumerService.poll(Duration.ofMillis(100));
+ for (ByteRecord ignored : polledRecords) {
+ }
+
+ consumerService.onPartitionsRevoked(List.of(partition0));
+ assertTrue(consumerService.hasRevokedPartitions());
+
+ consumerService.rollback();
+ assertFalse(consumerService.hasRevokedPartitions());
+
+ consumerService.commitOffsetsForRevokedPartitions();
+ verify(consumer, never()).commitSync(anyMap());
+ }
+
+ @Test
+ void testClearRevokedPartitionsWithoutCommitting() {
+ final TopicPartition partition0 = new TopicPartition(TOPIC,
PARTITION_0);
+
+ final ConsumerRecord<byte[], byte[]> record0 = createRecord(TOPIC,
PARTITION_0, 5L);
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
recordsMap = new HashMap<>();
+ recordsMap.put(partition0, List.of(record0));
+ final ConsumerRecords<byte[], byte[]> consumerRecords =
createConsumerRecords(recordsMap);
+
+ when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+ final Iterable<ByteRecord> polledRecords =
consumerService.poll(Duration.ofMillis(100));
+ for (ByteRecord ignored : polledRecords) {
+ }
+
+ consumerService.onPartitionsRevoked(List.of(partition0));
+ assertTrue(consumerService.hasRevokedPartitions());
+
+ consumerService.clearRevokedPartitions();
+ assertFalse(consumerService.hasRevokedPartitions());
+
+ verify(consumer, never()).commitSync(anyMap());
+ }
+
+ private ConsumerRecord<byte[], byte[]> createRecord(final String topic,
final int partition, final long offset) {
+ return new ConsumerRecord<>(
+ topic,
+ partition,
+ offset,
+ System.currentTimeMillis(),
+ TimestampType.CREATE_TIME,
+ 0,
+ 0,
+ null,
+ "test-value".getBytes(),
+ new RecordHeaders(),
+ Optional.empty()
+ );
+ }
+
+ private ConsumerRecords<byte[], byte[]> createConsumerRecords(
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
recordsMap) {
+ // Calculate next offsets from the records (max offset + 1 for each
partition)
+ final Map<TopicPartition, OffsetAndMetadata> nextOffsets = new
HashMap<>();
+ for (Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
entry : recordsMap.entrySet()) {
+ long maxOffset = entry.getValue().stream()
+ .mapToLong(ConsumerRecord::offset)
+ .max()
+ .orElse(-1L);
+ nextOffsets.put(entry.getKey(), new OffsetAndMetadata(maxOffset +
1));
+ }
+ return new ConsumerRecords<>(recordsMap, nextOffsets);
+ }
+}