This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 19db2ef9be NIFI-15293 Checkpoint committed records in ConsumeKinesis 
(#10600)
19db2ef9be is described below

commit 19db2ef9be41a6599d4b596b685f06788107f32e
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Thu Dec 4 18:25:06 2025 +0100

    NIFI-15293 Checkpoint committed records in ConsumeKinesis (#10600)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../aws/kinesis/MemoryBoundRecordBuffer.java       | 66 ++++++++++-----
 .../aws/kinesis/MemoryBoundRecordBufferTest.java   | 98 ++++++++++++++--------
 2 files changed, 109 insertions(+), 55 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java
index c20d695263..10b349a81a 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java
@@ -394,7 +394,7 @@ final class MemoryBoundRecordBuffer implements 
RecordBuffer.ForKinesisClientLibr
     }
 
     private record RecordBatch(List<KinesisClientRecord> records,
-                               @Nullable RecordProcessorCheckpointer 
checkpointer,
+                               RecordProcessorCheckpointer checkpointer,
                                long batchSizeBytes) {
         int size() {
             return records.size();
@@ -439,11 +439,12 @@ final class MemoryBoundRecordBuffer implements 
RecordBuffer.ForKinesisClientLibr
 
         private final long checkpointIntervalMillis;
         private volatile long nextCheckpointTimeMillis;
+
         /**
-         * A last records checkpointer that was ignored due to the checkpoint 
interval.
+         * A last record checkpointer and sequence number that was ignored due 
to the checkpoint interval.
          * If null, the last checkpoint was successful or no checkpoint was 
attempted yet.
          */
-        private volatile @Nullable RecordProcessorCheckpointer 
lastIgnoredCheckpointer = null;
+        private volatile @Nullable LastIgnoredCheckpoint lastIgnoredCheckpoint;
 
         /**
          * Queues for managing record batches with their checkpointers in 
different states.
@@ -524,13 +525,15 @@ final class MemoryBoundRecordBuffer implements 
RecordBuffer.ForKinesisClientLibr
             batchesCount.addAndGet(-checkpointedBatches.size());
 
             final RecordProcessorCheckpointer lastBatchCheckpointer = 
checkpointedBatches.getLast().checkpointer();
+            final KinesisClientRecord lastRecord = 
checkpointedBatches.getLast().records().getLast();
+
             if (System.currentTimeMillis() >= nextCheckpointTimeMillis) {
-                checkpointSafely(lastBatchCheckpointer);
+                checkpointSequenceNumber(lastBatchCheckpointer, 
lastRecord.sequenceNumber(), lastRecord.subSequenceNumber());
                 nextCheckpointTimeMillis = System.currentTimeMillis() + 
checkpointIntervalMillis;
-                lastIgnoredCheckpointer = null;
+                lastIgnoredCheckpoint = null;
             } else {
                 // Saving the checkpointer for later, in case shutdown happens 
before the next checkpoint.
-                lastIgnoredCheckpointer = lastBatchCheckpointer;
+                lastIgnoredCheckpoint = new 
LastIgnoredCheckpoint(lastBatchCheckpointer, lastRecord.sequenceNumber(), 
lastRecord.subSequenceNumber());
             }
 
             final CountDownLatch localEmptyBufferLatch = this.emptyBufferLatch;
@@ -568,7 +571,7 @@ final class MemoryBoundRecordBuffer implements 
RecordBuffer.ForKinesisClientLibr
 
                 if (batchesCount.get() == 0) {
                     // Buffer is empty, perform final checkpoint.
-                    checkpointSafely(checkpointer);
+                    checkpointLastReceivedRecord(checkpointer);
                     return;
                 }
 
@@ -588,13 +591,17 @@ final class MemoryBoundRecordBuffer implements 
RecordBuffer.ForKinesisClientLibr
             }
 
             if (batchesCount.get() == 0) {
-                checkpointSafely(checkpointer);
+                checkpointLastReceivedRecord(checkpointer);
             } else {
                 // If there are still records in the buffer, checkpointing 
with the latest provided checkpointer is not safe.
                 // But, if the records were committed without checkpointing in 
the past, we can checkpoint them now.
-                final RecordProcessorCheckpointer lastCheckpointer = 
this.lastIgnoredCheckpointer;
-                if (lastCheckpointer != null) {
-                    checkpointSafely(lastCheckpointer);
+                final LastIgnoredCheckpoint ignoredCheckpoint = 
this.lastIgnoredCheckpoint;
+                if (ignoredCheckpoint != null) {
+                    checkpointSequenceNumber(
+                            ignoredCheckpoint.checkpointer(),
+                            ignoredCheckpoint.sequenceNumber(),
+                            ignoredCheckpoint.subSequenceNumber()
+                    );
                 }
             }
         }
@@ -629,22 +636,28 @@ final class MemoryBoundRecordBuffer implements 
RecordBuffer.ForKinesisClientLibr
             return invalidated.get() || batchesCount.get() == 0;
         }
 
+        private void checkpointLastReceivedRecord(final 
RecordProcessorCheckpointer checkpointer) {
+            logger.debug("Performing checkpoint for buffer with id {}. 
Checkpointing the last received record", bufferId);
+
+            checkpointSafely(checkpointer::checkpoint);
+        }
+
+        private void checkpointSequenceNumber(final 
RecordProcessorCheckpointer checkpointer, final String sequenceNumber, final 
long subSequenceNumber) {
+            logger.debug("Performing checkpoint for buffer with id {}. 
Sequence number: [{}], sub sequence number: [{}]",
+                    bufferId, sequenceNumber, subSequenceNumber);
+
+            checkpointSafely(() -> checkpointer.checkpoint(sequenceNumber, 
subSequenceNumber));
+        }
+
         /**
          * Performs checkpointing using exponential backoff and jitter, if 
needed.
          *
-         * @param checkpointer the checkpointer to use.
+         * @param checkpointAction the action which performs the checkpointing.
          */
-        private void checkpointSafely(final @Nullable 
RecordProcessorCheckpointer checkpointer) {
-            if (checkpointer == null) {
-                logger.warn("Attempting to checkpoint records with a null 
checkpointer. Ignoring checkpoint");
-                return;
-            }
-
-            logger.debug("Performing checkpoint for buffer with id {}", 
bufferId);
-
+        private void checkpointSafely(final CheckpointAction checkpointAction) 
{
             for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; attempt++) {
                 try {
-                    checkpointer.checkpoint();
+                    checkpointAction.doCheckpoint();
                     if (attempt > 1) {
                         logger.debug("Checkpoint succeeded on attempt {}", 
attempt);
                     }
@@ -680,5 +693,16 @@ final class MemoryBoundRecordBuffer implements 
RecordBuffer.ForKinesisClientLibr
             final long jitterMillis = RANDOM.nextLong(baseDelayMillis / 4); // 
Up to 25% jitter.
             return baseDelayMillis + jitterMillis;
         }
+
+        private interface CheckpointAction {
+
+            /**
+             * Throws the same set of exceptions as {@link 
RecordProcessorCheckpointer#checkpoint()} and {@link 
RecordProcessorCheckpointer#checkpoint(String, long)}.
+             */
+            void doCheckpoint() throws KinesisClientLibDependencyException, 
InvalidStateException, ThrottlingException, ShutdownException, 
IllegalArgumentException;
+        }
+
+        private record LastIgnoredCheckpoint(RecordProcessorCheckpointer 
checkpointer, String sequenceNumber, long subSequenceNumber) {
+        }
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBufferTest.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBufferTest.java
index 608d6f6cc4..9c16b6b18a 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBufferTest.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBufferTest.java
@@ -46,7 +46,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
@@ -54,7 +53,6 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.junit.jupiter.api.Assertions.assertAll;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -129,7 +127,7 @@ class MemoryBoundRecordBufferTest {
         final List<KinesisClientRecord> consumedRecords = 
recordBuffer.consumeRecords(lease);
         assertEquals(records, consumedRecords);
         // Just consuming record should not checkpoint them.
-        assertFalse(checkpointer1.isCheckpointed());
+        assertEquals(TestCheckpointer.NO_CHECKPOINT_SEQUENCE_NUMBER, 
checkpointer1.latestCheckpointedSequenceNumber());
     }
 
     @Test
@@ -143,7 +141,27 @@ class MemoryBoundRecordBufferTest {
         recordBuffer.consumeRecords(lease);
         recordBuffer.commitConsumedRecords(lease);
 
-        assertTrue(checkpointer1.isCheckpointed());
+        assertEquals(records.getLast().sequenceNumber(), 
checkpointer1.latestCheckpointedSequenceNumber());
+    }
+
+    @Test
+    void testCommitConsumedRecords_withRecordsAddedBeforeCommit() {
+        final ShardBufferId bufferId = recordBuffer.createBuffer(SHARD_ID_1);
+        final List<KinesisClientRecord> originalRecords = createTestRecords(2);
+
+        recordBuffer.addRecords(bufferId, originalRecords, checkpointer1);
+        final Lease lease = recordBuffer.acquireBufferLease().orElseThrow();
+
+        recordBuffer.consumeRecords(lease);
+
+        // Simulating new records added in parallel, before a commit.
+        final List<KinesisClientRecord> newRecords = createTestRecords(5);
+        recordBuffer.addRecords(bufferId, newRecords, checkpointer1);
+
+        recordBuffer.commitConsumedRecords(lease);
+
+        // Only originalRecords, which were consumed, are checkpointed.
+        assertEquals(originalRecords.getLast().sequenceNumber(), 
checkpointer1.latestCheckpointedSequenceNumber());
     }
 
     @Test
@@ -162,7 +180,7 @@ class MemoryBoundRecordBufferTest {
         recordBuffer.rollbackConsumedRecords(lease);
 
         // Checkpointer should not be called during rollback.
-        assertFalse(checkpointer1.isCheckpointed());
+        assertEquals(TestCheckpointer.NO_CHECKPOINT_SEQUENCE_NUMBER, 
checkpointer1.latestCheckpointedSequenceNumber());
 
         final List<String> rolledBackMessages = 
recordBuffer.consumeRecords(lease).stream()
                 .map(this::readContent)
@@ -278,7 +296,7 @@ class MemoryBoundRecordBufferTest {
 
         // Should not be able to commit records for invalidated buffer.
         recordBuffer.commitConsumedRecords(lease);
-        assertFalse(checkpointer1.isCheckpointed());
+        assertEquals(TestCheckpointer.NO_CHECKPOINT_SEQUENCE_NUMBER, 
checkpointer1.latestCheckpointedSequenceNumber());
 
         // Buffer should not be available in pool.
         assertTrue(recordBuffer.acquireBufferLease().isEmpty());
@@ -296,7 +314,7 @@ class MemoryBoundRecordBufferTest {
         recordBuffer.commitConsumedRecords(lease);
 
         recordBuffer.checkpointEndedShard(bufferId, checkpointer2);
-        assertTrue(checkpointer2.isCheckpointed());
+        assertEquals(TestCheckpointer.LATEST_SEQUENCE_NUMBER, 
checkpointer2.latestCheckpointedSequenceNumber());
 
         // Buffer should be removed and not available for operations.
         final List<KinesisClientRecord> consumedRecords = 
recordBuffer.consumeRecords(lease);
@@ -315,7 +333,7 @@ class MemoryBoundRecordBufferTest {
         recordBuffer.commitConsumedRecords(lease);
 
         recordBuffer.shutdownShardConsumption(bufferId, checkpointer2);
-        assertTrue(checkpointer2.isCheckpointed());
+        assertEquals(TestCheckpointer.LATEST_SEQUENCE_NUMBER, 
checkpointer2.latestCheckpointedSequenceNumber());
 
         // Buffer should be removed and not available for operations.
         final List<KinesisClientRecord> consumedRecords = 
recordBuffer.consumeRecords(lease);
@@ -330,8 +348,8 @@ class MemoryBoundRecordBufferTest {
         recordBuffer.addRecords(bufferId, records, checkpointer1);
 
         recordBuffer.shutdownShardConsumption(bufferId, checkpointer2);
-        assertFalse(checkpointer1.isCheckpointed());
-        assertFalse(checkpointer2.isCheckpointed());
+        assertEquals(TestCheckpointer.NO_CHECKPOINT_SEQUENCE_NUMBER, 
checkpointer1.latestCheckpointedSequenceNumber());
+        assertEquals(TestCheckpointer.NO_CHECKPOINT_SEQUENCE_NUMBER, 
checkpointer2.latestCheckpointedSequenceNumber());
 
         assertTrue(recordBuffer.acquireBufferLease().isEmpty(), "Buffer should 
not be available after shutdown");
     }
@@ -466,8 +484,11 @@ class MemoryBoundRecordBufferTest {
         executor.shutdown();
 
         assertAll(
-                checkpointers.stream()
-                        .map(it -> () -> assertTrue(it.isCheckpointed(), 
"Every checkpointer should have been called"))
+                checkpointers.stream().map(it -> () ->
+                        assertNotEquals(
+                                TestCheckpointer.NO_CHECKPOINT_SEQUENCE_NUMBER,
+                                it.latestCheckpointedSequenceNumber(),
+                                "Every checkpointer should have been called"))
         );
 
         final long uniqueRecordsProcessed = processedRecordsFutures.stream()
@@ -550,7 +571,7 @@ class MemoryBoundRecordBufferTest {
         // Should handle all exception types gracefully.
         recordBuffer.commitConsumedRecords(lease);
 
-        assertTrue(failingCheckpointer.isCheckpointed());
+        assertEquals(records.getLast().sequenceNumber(), 
failingCheckpointer.latestCheckpointedSequenceNumber());
     }
 
     @Test
@@ -566,7 +587,7 @@ class MemoryBoundRecordBufferTest {
         recordBuffer.consumeRecords(lease);
 
         recordBuffer.commitConsumedRecords(lease);
-        assertFalse(failingCheckpointer.isCheckpointed());
+        assertEquals(TestCheckpointer.NO_CHECKPOINT_SEQUENCE_NUMBER, 
failingCheckpointer.latestCheckpointedSequenceNumber());
     }
 
     @Test
@@ -596,10 +617,13 @@ class MemoryBoundRecordBufferTest {
 
         // Commit records to unblock finishConsumption.
         recordBuffer.commitConsumedRecords(lease);
-        assertTrue(checkpointer1.isCheckpointed());
+        assertEquals(records.getLast().sequenceNumber(), 
checkpointer1.latestCheckpointedSequenceNumber());
 
         finishThread.join();
-        assertTrue(checkpointer2.isCheckpointed(), "Checkpointer should be 
called after finishConsumption unblocks");
+        assertEquals(
+                TestCheckpointer.LATEST_SEQUENCE_NUMBER,
+                checkpointer2.latestCheckpointedSequenceNumber(),
+                "Checkpointer should be called after finishConsumption 
unblocks");
     }
 
     private List<KinesisClientRecord> createTestRecords(int count) {
@@ -632,10 +656,14 @@ class MemoryBoundRecordBufferTest {
      */
     private static class TestCheckpointer implements 
RecordProcessorCheckpointer {
 
-        private final AtomicBoolean checkpointed = new AtomicBoolean(false);
+        static final String NO_CHECKPOINT_SEQUENCE_NUMBER = "NONE";
+        static final String LATEST_SEQUENCE_NUMBER = "LATEST";
+
         private final Exception exceptionToThrow;
         private final AtomicInteger throwsLeft;
 
+        private volatile String latestCheckpointedSequenceNumber = 
NO_CHECKPOINT_SEQUENCE_NUMBER;
+
         TestCheckpointer() {
             this.exceptionToThrow = null;
             this.throwsLeft = new AtomicInteger(0);
@@ -648,19 +676,7 @@ class MemoryBoundRecordBufferTest {
 
         @Override
         public void checkpoint() throws KinesisClientLibDependencyException, 
InvalidStateException, ThrottlingException, ShutdownException {
-            if (exceptionToThrow != null && throwsLeft.decrementAndGet() == 0) 
{
-                switch (exceptionToThrow) {
-                    case KinesisClientLibDependencyException e -> throw e;
-                    case InvalidStateException e -> throw e;
-                    case ThrottlingException e -> throw e;
-                    case ShutdownException e -> throw e;
-                    default -> throw new RuntimeException(exceptionToThrow);
-                }
-            }
-
-            if (checkpointed.getAndSet(true)) {
-                throw new IllegalStateException("TestCheckpointer has already 
been checkpointed");
-            }
+            doCheckpoint(LATEST_SEQUENCE_NUMBER);
         }
 
         @Override
@@ -674,8 +690,22 @@ class MemoryBoundRecordBufferTest {
         }
 
         @Override
-        public void checkpoint(String sequenceNumber, long subSequenceNumber) {
-            throw notImplemented();
+        public void checkpoint(String sequenceNumber, long subSequenceNumber) 
throws ShutdownException, InvalidStateException {
+            doCheckpoint(sequenceNumber);
+        }
+
+        private void doCheckpoint(final String sequenceNumber) throws 
InvalidStateException, ShutdownException {
+            if (exceptionToThrow != null && throwsLeft.decrementAndGet() == 0) 
{
+                switch (exceptionToThrow) {
+                    case KinesisClientLibDependencyException e -> throw e;
+                    case InvalidStateException e -> throw e;
+                    case ThrottlingException e -> throw e;
+                    case ShutdownException e -> throw e;
+                    default -> throw new RuntimeException(exceptionToThrow);
+                }
+            }
+
+            latestCheckpointedSequenceNumber = sequenceNumber;
         }
 
         @Override
@@ -723,8 +753,8 @@ class MemoryBoundRecordBufferTest {
             throw notImplemented();
         }
 
-        boolean isCheckpointed() {
-            return checkpointed.get();
+        String latestCheckpointedSequenceNumber() {
+            return latestCheckpointedSequenceNumber;
         }
 
         private static RuntimeException notImplemented() {

Reply via email to