This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 19db2ef9be NIFI-15293 Checkpoint committed records in ConsumeKinesis
(#10600)
19db2ef9be is described below
commit 19db2ef9be41a6599d4b596b685f06788107f32e
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Thu Dec 4 18:25:06 2025 +0100
NIFI-15293 Checkpoint committed records in ConsumeKinesis (#10600)
Signed-off-by: David Handermann <[email protected]>
---
.../aws/kinesis/MemoryBoundRecordBuffer.java | 66 ++++++++++-----
.../aws/kinesis/MemoryBoundRecordBufferTest.java | 98 ++++++++++++++--------
2 files changed, 109 insertions(+), 55 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java
index c20d695263..10b349a81a 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java
@@ -394,7 +394,7 @@ final class MemoryBoundRecordBuffer implements
RecordBuffer.ForKinesisClientLibr
}
private record RecordBatch(List<KinesisClientRecord> records,
- @Nullable RecordProcessorCheckpointer
checkpointer,
+ RecordProcessorCheckpointer checkpointer,
long batchSizeBytes) {
int size() {
return records.size();
@@ -439,11 +439,12 @@ final class MemoryBoundRecordBuffer implements
RecordBuffer.ForKinesisClientLibr
private final long checkpointIntervalMillis;
private volatile long nextCheckpointTimeMillis;
+
/**
- * A last records checkpointer that was ignored due to the checkpoint
interval.
+ * A last record checkpointer and sequence number that was ignored due
to the checkpoint interval.
* If null, the last checkpoint was successful or no checkpoint was
attempted yet.
*/
- private volatile @Nullable RecordProcessorCheckpointer
lastIgnoredCheckpointer = null;
+ private volatile @Nullable LastIgnoredCheckpoint lastIgnoredCheckpoint;
/**
* Queues for managing record batches with their checkpointers in
different states.
@@ -524,13 +525,15 @@ final class MemoryBoundRecordBuffer implements
RecordBuffer.ForKinesisClientLibr
batchesCount.addAndGet(-checkpointedBatches.size());
final RecordProcessorCheckpointer lastBatchCheckpointer =
checkpointedBatches.getLast().checkpointer();
+ final KinesisClientRecord lastRecord =
checkpointedBatches.getLast().records().getLast();
+
if (System.currentTimeMillis() >= nextCheckpointTimeMillis) {
- checkpointSafely(lastBatchCheckpointer);
+ checkpointSequenceNumber(lastBatchCheckpointer,
lastRecord.sequenceNumber(), lastRecord.subSequenceNumber());
nextCheckpointTimeMillis = System.currentTimeMillis() +
checkpointIntervalMillis;
- lastIgnoredCheckpointer = null;
+ lastIgnoredCheckpoint = null;
} else {
// Saving the checkpointer for later, in case shutdown happens
before the next checkpoint.
- lastIgnoredCheckpointer = lastBatchCheckpointer;
+ lastIgnoredCheckpoint = new
LastIgnoredCheckpoint(lastBatchCheckpointer, lastRecord.sequenceNumber(),
lastRecord.subSequenceNumber());
}
final CountDownLatch localEmptyBufferLatch = this.emptyBufferLatch;
@@ -568,7 +571,7 @@ final class MemoryBoundRecordBuffer implements
RecordBuffer.ForKinesisClientLibr
if (batchesCount.get() == 0) {
// Buffer is empty, perform final checkpoint.
- checkpointSafely(checkpointer);
+ checkpointLastReceivedRecord(checkpointer);
return;
}
@@ -588,13 +591,17 @@ final class MemoryBoundRecordBuffer implements
RecordBuffer.ForKinesisClientLibr
}
if (batchesCount.get() == 0) {
- checkpointSafely(checkpointer);
+ checkpointLastReceivedRecord(checkpointer);
} else {
// If there are still records in the buffer, checkpointing
with the latest provided checkpointer is not safe.
// But, if the records were committed without checkpointing in
the past, we can checkpoint them now.
- final RecordProcessorCheckpointer lastCheckpointer =
this.lastIgnoredCheckpointer;
- if (lastCheckpointer != null) {
- checkpointSafely(lastCheckpointer);
+ final LastIgnoredCheckpoint ignoredCheckpoint =
this.lastIgnoredCheckpoint;
+ if (ignoredCheckpoint != null) {
+ checkpointSequenceNumber(
+ ignoredCheckpoint.checkpointer(),
+ ignoredCheckpoint.sequenceNumber(),
+ ignoredCheckpoint.subSequenceNumber()
+ );
}
}
}
@@ -629,22 +636,28 @@ final class MemoryBoundRecordBuffer implements
RecordBuffer.ForKinesisClientLibr
return invalidated.get() || batchesCount.get() == 0;
}
+ private void checkpointLastReceivedRecord(final
RecordProcessorCheckpointer checkpointer) {
+ logger.debug("Performing checkpoint for buffer with id {}.
Checkpointing the last received record", bufferId);
+
+ checkpointSafely(checkpointer::checkpoint);
+ }
+
+ private void checkpointSequenceNumber(final
RecordProcessorCheckpointer checkpointer, final String sequenceNumber, final
long subSequenceNumber) {
+ logger.debug("Performing checkpoint for buffer with id {}.
Sequence number: [{}], sub sequence number: [{}]",
+ bufferId, sequenceNumber, subSequenceNumber);
+
+ checkpointSafely(() -> checkpointer.checkpoint(sequenceNumber,
subSequenceNumber));
+ }
+
/**
* Performs checkpointing using exponential backoff and jitter, if
needed.
*
- * @param checkpointer the checkpointer to use.
+ * @param checkpointAction the action which performs the checkpointing.
*/
- private void checkpointSafely(final @Nullable
RecordProcessorCheckpointer checkpointer) {
- if (checkpointer == null) {
- logger.warn("Attempting to checkpoint records with a null
checkpointer. Ignoring checkpoint");
- return;
- }
-
- logger.debug("Performing checkpoint for buffer with id {}",
bufferId);
-
+ private void checkpointSafely(final CheckpointAction checkpointAction)
{
for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; attempt++) {
try {
- checkpointer.checkpoint();
+ checkpointAction.doCheckpoint();
if (attempt > 1) {
logger.debug("Checkpoint succeeded on attempt {}",
attempt);
}
@@ -680,5 +693,16 @@ final class MemoryBoundRecordBuffer implements
RecordBuffer.ForKinesisClientLibr
final long jitterMillis = RANDOM.nextLong(baseDelayMillis / 4); //
Up to 25% jitter.
return baseDelayMillis + jitterMillis;
}
+
+ private interface CheckpointAction {
+
+ /**
+ * Throws the same set of exceptions as {@link
RecordProcessorCheckpointer#checkpoint()} and {@link
RecordProcessorCheckpointer#checkpoint(String, long)}.
+ */
+ void doCheckpoint() throws KinesisClientLibDependencyException,
InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException;
+ }
+
+ private record LastIgnoredCheckpoint(RecordProcessorCheckpointer
checkpointer, String sequenceNumber, long subSequenceNumber) {
+ }
}
}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBufferTest.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBufferTest.java
index 608d6f6cc4..9c16b6b18a 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBufferTest.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBufferTest.java
@@ -46,7 +46,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
@@ -54,7 +53,6 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -129,7 +127,7 @@ class MemoryBoundRecordBufferTest {
final List<KinesisClientRecord> consumedRecords =
recordBuffer.consumeRecords(lease);
assertEquals(records, consumedRecords);
// Just consuming record should not checkpoint them.
- assertFalse(checkpointer1.isCheckpointed());
+ assertEquals(TestCheckpointer.NO_CHECKPOINT_SEQUENCE_NUMBER,
checkpointer1.latestCheckpointedSequenceNumber());
}
@Test
@@ -143,7 +141,27 @@ class MemoryBoundRecordBufferTest {
recordBuffer.consumeRecords(lease);
recordBuffer.commitConsumedRecords(lease);
- assertTrue(checkpointer1.isCheckpointed());
+ assertEquals(records.getLast().sequenceNumber(),
checkpointer1.latestCheckpointedSequenceNumber());
+ }
+
+ @Test
+ void testCommitConsumedRecords_withRecordsAddedBeforeCommit() {
+ final ShardBufferId bufferId = recordBuffer.createBuffer(SHARD_ID_1);
+ final List<KinesisClientRecord> originalRecords = createTestRecords(2);
+
+ recordBuffer.addRecords(bufferId, originalRecords, checkpointer1);
+ final Lease lease = recordBuffer.acquireBufferLease().orElseThrow();
+
+ recordBuffer.consumeRecords(lease);
+
+ // Simulating new records added in parallel, before a commit.
+ final List<KinesisClientRecord> newRecords = createTestRecords(5);
+ recordBuffer.addRecords(bufferId, newRecords, checkpointer1);
+
+ recordBuffer.commitConsumedRecords(lease);
+
+ // Only originalRecords, which were consumed, are checkpointed.
+ assertEquals(originalRecords.getLast().sequenceNumber(),
checkpointer1.latestCheckpointedSequenceNumber());
}
@Test
@@ -162,7 +180,7 @@ class MemoryBoundRecordBufferTest {
recordBuffer.rollbackConsumedRecords(lease);
// Checkpointer should not be called during rollback.
- assertFalse(checkpointer1.isCheckpointed());
+ assertEquals(TestCheckpointer.NO_CHECKPOINT_SEQUENCE_NUMBER,
checkpointer1.latestCheckpointedSequenceNumber());
final List<String> rolledBackMessages =
recordBuffer.consumeRecords(lease).stream()
.map(this::readContent)
@@ -278,7 +296,7 @@ class MemoryBoundRecordBufferTest {
// Should not be able to commit records for invalidated buffer.
recordBuffer.commitConsumedRecords(lease);
- assertFalse(checkpointer1.isCheckpointed());
+ assertEquals(TestCheckpointer.NO_CHECKPOINT_SEQUENCE_NUMBER,
checkpointer1.latestCheckpointedSequenceNumber());
// Buffer should not be available in pool.
assertTrue(recordBuffer.acquireBufferLease().isEmpty());
@@ -296,7 +314,7 @@ class MemoryBoundRecordBufferTest {
recordBuffer.commitConsumedRecords(lease);
recordBuffer.checkpointEndedShard(bufferId, checkpointer2);
- assertTrue(checkpointer2.isCheckpointed());
+ assertEquals(TestCheckpointer.LATEST_SEQUENCE_NUMBER,
checkpointer2.latestCheckpointedSequenceNumber());
// Buffer should be removed and not available for operations.
final List<KinesisClientRecord> consumedRecords =
recordBuffer.consumeRecords(lease);
@@ -315,7 +333,7 @@ class MemoryBoundRecordBufferTest {
recordBuffer.commitConsumedRecords(lease);
recordBuffer.shutdownShardConsumption(bufferId, checkpointer2);
- assertTrue(checkpointer2.isCheckpointed());
+ assertEquals(TestCheckpointer.LATEST_SEQUENCE_NUMBER,
checkpointer2.latestCheckpointedSequenceNumber());
// Buffer should be removed and not available for operations.
final List<KinesisClientRecord> consumedRecords =
recordBuffer.consumeRecords(lease);
@@ -330,8 +348,8 @@ class MemoryBoundRecordBufferTest {
recordBuffer.addRecords(bufferId, records, checkpointer1);
recordBuffer.shutdownShardConsumption(bufferId, checkpointer2);
- assertFalse(checkpointer1.isCheckpointed());
- assertFalse(checkpointer2.isCheckpointed());
+ assertEquals(TestCheckpointer.NO_CHECKPOINT_SEQUENCE_NUMBER,
checkpointer1.latestCheckpointedSequenceNumber());
+ assertEquals(TestCheckpointer.NO_CHECKPOINT_SEQUENCE_NUMBER,
checkpointer2.latestCheckpointedSequenceNumber());
assertTrue(recordBuffer.acquireBufferLease().isEmpty(), "Buffer should
not be available after shutdown");
}
@@ -466,8 +484,11 @@ class MemoryBoundRecordBufferTest {
executor.shutdown();
assertAll(
- checkpointers.stream()
- .map(it -> () -> assertTrue(it.isCheckpointed(),
"Every checkpointer should have been called"))
+ checkpointers.stream().map(it -> () ->
+ assertNotEquals(
+ TestCheckpointer.NO_CHECKPOINT_SEQUENCE_NUMBER,
+ it.latestCheckpointedSequenceNumber(),
+ "Every checkpointer should have been called"))
);
final long uniqueRecordsProcessed = processedRecordsFutures.stream()
@@ -550,7 +571,7 @@ class MemoryBoundRecordBufferTest {
// Should handle all exception types gracefully.
recordBuffer.commitConsumedRecords(lease);
- assertTrue(failingCheckpointer.isCheckpointed());
+ assertEquals(records.getLast().sequenceNumber(),
failingCheckpointer.latestCheckpointedSequenceNumber());
}
@Test
@@ -566,7 +587,7 @@ class MemoryBoundRecordBufferTest {
recordBuffer.consumeRecords(lease);
recordBuffer.commitConsumedRecords(lease);
- assertFalse(failingCheckpointer.isCheckpointed());
+ assertEquals(TestCheckpointer.NO_CHECKPOINT_SEQUENCE_NUMBER,
failingCheckpointer.latestCheckpointedSequenceNumber());
}
@Test
@@ -596,10 +617,13 @@ class MemoryBoundRecordBufferTest {
// Commit records to unblock finishConsumption.
recordBuffer.commitConsumedRecords(lease);
- assertTrue(checkpointer1.isCheckpointed());
+ assertEquals(records.getLast().sequenceNumber(),
checkpointer1.latestCheckpointedSequenceNumber());
finishThread.join();
- assertTrue(checkpointer2.isCheckpointed(), "Checkpointer should be
called after finishConsumption unblocks");
+ assertEquals(
+ TestCheckpointer.LATEST_SEQUENCE_NUMBER,
+ checkpointer2.latestCheckpointedSequenceNumber(),
+ "Checkpointer should be called after finishConsumption
unblocks");
}
private List<KinesisClientRecord> createTestRecords(int count) {
@@ -632,10 +656,14 @@ class MemoryBoundRecordBufferTest {
*/
private static class TestCheckpointer implements
RecordProcessorCheckpointer {
- private final AtomicBoolean checkpointed = new AtomicBoolean(false);
+ static final String NO_CHECKPOINT_SEQUENCE_NUMBER = "NONE";
+ static final String LATEST_SEQUENCE_NUMBER = "LATEST";
+
private final Exception exceptionToThrow;
private final AtomicInteger throwsLeft;
+ private volatile String latestCheckpointedSequenceNumber =
NO_CHECKPOINT_SEQUENCE_NUMBER;
+
TestCheckpointer() {
this.exceptionToThrow = null;
this.throwsLeft = new AtomicInteger(0);
@@ -648,19 +676,7 @@ class MemoryBoundRecordBufferTest {
@Override
public void checkpoint() throws KinesisClientLibDependencyException,
InvalidStateException, ThrottlingException, ShutdownException {
- if (exceptionToThrow != null && throwsLeft.decrementAndGet() == 0)
{
- switch (exceptionToThrow) {
- case KinesisClientLibDependencyException e -> throw e;
- case InvalidStateException e -> throw e;
- case ThrottlingException e -> throw e;
- case ShutdownException e -> throw e;
- default -> throw new RuntimeException(exceptionToThrow);
- }
- }
-
- if (checkpointed.getAndSet(true)) {
- throw new IllegalStateException("TestCheckpointer has already
been checkpointed");
- }
+ doCheckpoint(LATEST_SEQUENCE_NUMBER);
}
@Override
@@ -674,8 +690,22 @@ class MemoryBoundRecordBufferTest {
}
@Override
- public void checkpoint(String sequenceNumber, long subSequenceNumber) {
- throw notImplemented();
+ public void checkpoint(String sequenceNumber, long subSequenceNumber)
throws ShutdownException, InvalidStateException {
+ doCheckpoint(sequenceNumber);
+ }
+
+ private void doCheckpoint(final String sequenceNumber) throws
InvalidStateException, ShutdownException {
+ if (exceptionToThrow != null && throwsLeft.decrementAndGet() == 0)
{
+ switch (exceptionToThrow) {
+ case KinesisClientLibDependencyException e -> throw e;
+ case InvalidStateException e -> throw e;
+ case ThrottlingException e -> throw e;
+ case ShutdownException e -> throw e;
+ default -> throw new RuntimeException(exceptionToThrow);
+ }
+ }
+
+ latestCheckpointedSequenceNumber = sequenceNumber;
}
@Override
@@ -723,8 +753,8 @@ class MemoryBoundRecordBufferTest {
throw notImplemented();
}
- boolean isCheckpointed() {
- return checkpointed.get();
+ String latestCheckpointedSequenceNumber() {
+ return latestCheckpointedSequenceNumber;
}
private static RuntimeException notImplemented() {