awelless commented on code in PR #10600:
URL: https://github.com/apache/nifi/pull/10600#discussion_r2588867820


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java:
##########
@@ -680,5 +693,16 @@ private long calculateRetryDelay(final int attempt) {
             final long jitterMillis = RANDOM.nextLong(baseDelayMillis / 4); // 
Up to 25% jitter.
             return baseDelayMillis + jitterMillis;
         }
+
+        private interface CheckpointAction {
+
+            /**
+             * Throws the same set of exceptions as {@link 
RecordProcessorCheckpointer#checkpoint()} and {@link 
RecordProcessorCheckpointer#checkpoint(String, long)}.
+             */
+            void doCheckpoint() throws KinesisClientLibDependencyException, 
InvalidStateException, ThrottlingException, ShutdownException, 
IllegalArgumentException;
+        }
+
+        private record LastIgnoredCheckpoint(RecordProcessorCheckpointer 
checkpointer, String sequenceNumber, long subSequenceNumber) {

Review Comment:
   In the actual KCL implementation the same stateful 
`RecordProcessorCheckpointer` is passed to each processor's method, however the 
API doesn't call this out explicitly.
   In case this changes in the future, it's better to keep a reference to the 
checkpointer which was passed with a batch of records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to