RobertIndie commented on code in PR #24534:
URL: https://github.com/apache/pulsar/pull/24534#discussion_r2218748567


##########
pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessor.java:
##########
@@ -39,120 +44,178 @@
 @Slf4j
 public class KinesisRecordProcessor implements ShardRecordProcessor {
 
+    private record CheckpointSequenceNumber(String sequenceNumber, long 
subSequenceNumber) {}
+
     private final int numRetries;
     private final long checkpointInterval;
     private final long backoffTime;
-
     private final LinkedBlockingQueue<KinesisRecord> queue;
     private final SourceContext sourceContext;
     private final Set<String> propertiesToInclude;
-
-    private long nextCheckpointTimeInNanos;
+    private final ScheduledExecutorService checkpointExecutor;
+    private final AtomicReference<RecordProcessorCheckpointer> checkpointerRef 
= new AtomicReference<>();
+    private final AtomicBoolean isCheckpointing = new AtomicBoolean(false);
     private String kinesisShardId;
-    private volatile String sequenceNumberToCheckpoint = null;
-    private String lastCheckpointedSequenceNumber = null;
+    private final AtomicInteger numRecordsInFlight = new AtomicInteger(0);
+
+    private volatile CheckpointSequenceNumber sequenceNumberNeedToCheckpoint = 
null;
+    private volatile CheckpointSequenceNumber lastCheckpointSequenceNumber = 
null;
 
     public KinesisRecordProcessor(LinkedBlockingQueue<KinesisRecord> queue, 
KinesisSourceConfig config,
-                                  SourceContext sourceContext) {
+                                  SourceContext sourceContext, 
ScheduledExecutorService checkpointExecutor) {
         this.queue = queue;
         this.checkpointInterval = config.getCheckpointInterval();
         this.numRetries = config.getNumRetries();
         this.backoffTime = config.getBackoffTime();
         this.propertiesToInclude = config.getPropertiesToInclude();
         this.sourceContext = sourceContext;
+        this.checkpointExecutor = checkpointExecutor;
     }
 
-    private void checkpoint(RecordProcessorCheckpointer checkpointer, String 
sequenceNumber) {
-        log.info("Checkpointing shard {} at sequence number {}", 
kinesisShardId, sequenceNumber);
-        for (int i = 0; i < numRetries; i++) {
-            try {
-                checkpointer.checkpoint(sequenceNumber);
-                lastCheckpointedSequenceNumber = sequenceNumber;
-                break;
-            } catch (ShutdownException se) {
-                log.info("Caught shutdown exception, skipping checkpoint.", 
se);
-                sourceContext.fatal(se);
-                break;
-            } catch (InvalidStateException e) {
-                log.error("Cannot save checkpoint to the DynamoDB table.", e);
+    private void tryCheckpointWithRetry(RecordProcessorCheckpointer 
checkpointer,
+                                        CheckpointSequenceNumber checkpoint, 
int attempt) {
+        try {
+            log.info("Attempting checkpoint {}/{} for shard {} at {}. 
In-flight records: {}",
+                    attempt, numRetries, kinesisShardId, checkpoint, 
numRecordsInFlight.get());
+            checkpointer.checkpoint(checkpoint.sequenceNumber(), 
checkpoint.subSequenceNumber());
+            lastCheckpointSequenceNumber = checkpoint;
+            log.info("Successfully checkpointed shard {} at {}", 
kinesisShardId, checkpoint);
+            isCheckpointing.set(false);
+            checkpointExecutor.schedule(this::triggerCheckpoint, 
checkpointInterval, TimeUnit.MILLISECONDS);
+        } catch (ShutdownException | InvalidStateException e) {
+            log.error("Caught a non-retryable exception for shard {} during 
checkpoint at {}. Terminating.",
+                    kinesisShardId, checkpoint, e);
+            sourceContext.fatal(e);
+        } catch (ThrottlingException | KinesisClientLibDependencyException e) {
+            if (attempt >= numRetries) {
+                log.error("Checkpoint for shard {} failed after {} attempts at 
{}. Terminating.",
+                        kinesisShardId, numRetries, checkpoint, e);
                 sourceContext.fatal(e);
-                break;
-            } catch (ThrottlingException | KinesisClientLibDependencyException 
e) {
-                if (i >= (numRetries - 1)) {
-                    log.error("Checkpoint failed after {} attempts.", (i + 1), 
e);
-                    sourceContext.fatal(e);
-                    break;
-                }
-            }
-
-            try {
-                Thread.sleep(backoffTime);
-            } catch (InterruptedException e) {
-                log.debug("Interrupted sleep", e);
+            } else {
+                log.warn("Throttling/Dependency error on checkpoint for shard 
{} at {}. Scheduling retry {} "
+                                + "after {}ms.", kinesisShardId, checkpoint, 
attempt + 1, backoffTime);
+                checkpointExecutor.schedule(() -> 
tryCheckpointWithRetry(checkpointer, checkpoint, attempt + 1),
+                        backoffTime, TimeUnit.MILLISECONDS);
             }
         }
     }
 
-    public void updateSequenceNumberToCheckpoint(String sequenceNumber) {
-        this.sequenceNumberToCheckpoint = sequenceNumber;
+    public void updateSequenceNumberToCheckpoint(String sequenceNumber, long 
subSequenceNumber) {
+        CheckpointSequenceNumber newCheckpoint = new 
CheckpointSequenceNumber(sequenceNumber, subSequenceNumber);
+        log.debug("{} Updating sequence number to checkpoint {}", 
kinesisShardId, newCheckpoint);
+        this.sequenceNumberNeedToCheckpoint = newCheckpoint;
+        this.numRecordsInFlight.decrementAndGet();
     }
 
     public void failed() {
+        numRecordsInFlight.decrementAndGet();
         sourceContext.fatal(new PulsarClientException("Failed to process 
Kinesis records due send to pulsar topic"));
     }
 
     @Override
     public void initialize(InitializationInput initializationInput) {
         kinesisShardId = initializationInput.shardId();
-        log.info("Initializing KinesisRecordProcessor for shard {}. Config: 
checkpointInterval={}ms, numRetries={}, "
-                        + "backoffTime={}ms, propertiesToInclude={}",
-                kinesisShardId, checkpointInterval, numRetries, backoffTime, 
propertiesToInclude);
-        nextCheckpointTimeInNanos = System.nanoTime() + checkpointInterval;
+        log.info("Initializing KinesisRecordProcessor for shard {}, 
extendedSequenceNumber: {}, pendingCheckSeq: {}",
+                kinesisShardId, initializationInput.extendedSequenceNumber(),
+                initializationInput.pendingCheckpointSequenceNumber());
+        checkpointExecutor.schedule(this::triggerCheckpoint, 
checkpointInterval, TimeUnit.MILLISECONDS);
+    }
+
+    private void triggerCheckpoint() {
+        if (isCheckpointing.compareAndSet(false, true)) {
+            final RecordProcessorCheckpointer checkpointer = 
checkpointerRef.get();
+            final CheckpointSequenceNumber currentCheckpoint = 
this.sequenceNumberNeedToCheckpoint;
+
+            if (checkpointer != null && currentCheckpoint != null && 
!currentCheckpoint.equals(
+                    lastCheckpointSequenceNumber)) {
+                tryCheckpointWithRetry(checkpointer, currentCheckpoint, 1);
+            } else {
+                isCheckpointing.set(false);
+                checkpointExecutor.schedule(this::triggerCheckpoint, 
checkpointInterval, TimeUnit.MILLISECONDS);
+            }
+        }

Review Comment:
   We should add a try-catch for this method. Or inside the 
tryCheckpointWithRetry to catch any runtime exceptions/throwable like NPE. 
Because if there is an exception thrown in the `ScheduledExecutorService` 
thread, it won't print any logs and it's hard to debug.



##########
pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessor.java:
##########
@@ -39,120 +44,178 @@
 @Slf4j
 public class KinesisRecordProcessor implements ShardRecordProcessor {
 
+    private record CheckpointSequenceNumber(String sequenceNumber, long 
subSequenceNumber) {}
+
     private final int numRetries;
     private final long checkpointInterval;
     private final long backoffTime;
-
     private final LinkedBlockingQueue<KinesisRecord> queue;
     private final SourceContext sourceContext;
     private final Set<String> propertiesToInclude;
-
-    private long nextCheckpointTimeInNanos;
+    private final ScheduledExecutorService checkpointExecutor;
+    private final AtomicReference<RecordProcessorCheckpointer> checkpointerRef 
= new AtomicReference<>();
+    private final AtomicBoolean isCheckpointing = new AtomicBoolean(false);
     private String kinesisShardId;
-    private volatile String sequenceNumberToCheckpoint = null;
-    private String lastCheckpointedSequenceNumber = null;
+    private final AtomicInteger numRecordsInFlight = new AtomicInteger(0);
+
+    private volatile CheckpointSequenceNumber sequenceNumberNeedToCheckpoint = 
null;
+    private volatile CheckpointSequenceNumber lastCheckpointSequenceNumber = 
null;
 
     public KinesisRecordProcessor(LinkedBlockingQueue<KinesisRecord> queue, 
KinesisSourceConfig config,
-                                  SourceContext sourceContext) {
+                                  SourceContext sourceContext, 
ScheduledExecutorService checkpointExecutor) {
         this.queue = queue;
         this.checkpointInterval = config.getCheckpointInterval();
         this.numRetries = config.getNumRetries();
         this.backoffTime = config.getBackoffTime();
         this.propertiesToInclude = config.getPropertiesToInclude();
         this.sourceContext = sourceContext;
+        this.checkpointExecutor = checkpointExecutor;
     }
 
-    private void checkpoint(RecordProcessorCheckpointer checkpointer, String 
sequenceNumber) {
-        log.info("Checkpointing shard {} at sequence number {}", 
kinesisShardId, sequenceNumber);
-        for (int i = 0; i < numRetries; i++) {
-            try {
-                checkpointer.checkpoint(sequenceNumber);
-                lastCheckpointedSequenceNumber = sequenceNumber;
-                break;
-            } catch (ShutdownException se) {
-                log.info("Caught shutdown exception, skipping checkpoint.", 
se);
-                sourceContext.fatal(se);
-                break;
-            } catch (InvalidStateException e) {
-                log.error("Cannot save checkpoint to the DynamoDB table.", e);
+    private void tryCheckpointWithRetry(RecordProcessorCheckpointer 
checkpointer,
+                                        CheckpointSequenceNumber checkpoint, 
int attempt) {
+        try {
+            log.info("Attempting checkpoint {}/{} for shard {} at {}. 
In-flight records: {}",
+                    attempt, numRetries, kinesisShardId, checkpoint, 
numRecordsInFlight.get());
+            checkpointer.checkpoint(checkpoint.sequenceNumber(), 
checkpoint.subSequenceNumber());
+            lastCheckpointSequenceNumber = checkpoint;
+            log.info("Successfully checkpointed shard {} at {}", 
kinesisShardId, checkpoint);
+            isCheckpointing.set(false);
+            checkpointExecutor.schedule(this::triggerCheckpoint, 
checkpointInterval, TimeUnit.MILLISECONDS);
+        } catch (ShutdownException | InvalidStateException e) {
+            log.error("Caught a non-retryable exception for shard {} during 
checkpoint at {}. Terminating.",
+                    kinesisShardId, checkpoint, e);
+            sourceContext.fatal(e);
+        } catch (ThrottlingException | KinesisClientLibDependencyException e) {
+            if (attempt >= numRetries) {
+                log.error("Checkpoint for shard {} failed after {} attempts at 
{}. Terminating.",
+                        kinesisShardId, numRetries, checkpoint, e);
                 sourceContext.fatal(e);
-                break;
-            } catch (ThrottlingException | KinesisClientLibDependencyException 
e) {
-                if (i >= (numRetries - 1)) {
-                    log.error("Checkpoint failed after {} attempts.", (i + 1), 
e);
-                    sourceContext.fatal(e);
-                    break;
-                }
-            }
-
-            try {
-                Thread.sleep(backoffTime);
-            } catch (InterruptedException e) {
-                log.debug("Interrupted sleep", e);
+            } else {
+                log.warn("Throttling/Dependency error on checkpoint for shard 
{} at {}. Scheduling retry {} "
+                                + "after {}ms.", kinesisShardId, checkpoint, 
attempt + 1, backoffTime);
+                checkpointExecutor.schedule(() -> 
tryCheckpointWithRetry(checkpointer, checkpoint, attempt + 1),
+                        backoffTime, TimeUnit.MILLISECONDS);
             }
         }
     }
 
-    public void updateSequenceNumberToCheckpoint(String sequenceNumber) {
-        this.sequenceNumberToCheckpoint = sequenceNumber;
+    public void updateSequenceNumberToCheckpoint(String sequenceNumber, long 
subSequenceNumber) {
+        CheckpointSequenceNumber newCheckpoint = new 
CheckpointSequenceNumber(sequenceNumber, subSequenceNumber);
+        log.debug("{} Updating sequence number to checkpoint {}", 
kinesisShardId, newCheckpoint);
+        this.sequenceNumberNeedToCheckpoint = newCheckpoint;
+        this.numRecordsInFlight.decrementAndGet();
     }
 
     public void failed() {
+        numRecordsInFlight.decrementAndGet();
         sourceContext.fatal(new PulsarClientException("Failed to process 
Kinesis records due send to pulsar topic"));
     }
 
     @Override
     public void initialize(InitializationInput initializationInput) {
         kinesisShardId = initializationInput.shardId();
-        log.info("Initializing KinesisRecordProcessor for shard {}. Config: 
checkpointInterval={}ms, numRetries={}, "
-                        + "backoffTime={}ms, propertiesToInclude={}",
-                kinesisShardId, checkpointInterval, numRetries, backoffTime, 
propertiesToInclude);
-        nextCheckpointTimeInNanos = System.nanoTime() + checkpointInterval;
+        log.info("Initializing KinesisRecordProcessor for shard {}, 
extendedSequenceNumber: {}, pendingCheckSeq: {}",
+                kinesisShardId, initializationInput.extendedSequenceNumber(),
+                initializationInput.pendingCheckpointSequenceNumber());
+        checkpointExecutor.schedule(this::triggerCheckpoint, 
checkpointInterval, TimeUnit.MILLISECONDS);
+    }
+
+    private void triggerCheckpoint() {
+        if (isCheckpointing.compareAndSet(false, true)) {
+            final RecordProcessorCheckpointer checkpointer = 
checkpointerRef.get();
+            final CheckpointSequenceNumber currentCheckpoint = 
this.sequenceNumberNeedToCheckpoint;
+
+            if (checkpointer != null && currentCheckpoint != null && 
!currentCheckpoint.equals(
+                    lastCheckpointSequenceNumber)) {
+                tryCheckpointWithRetry(checkpointer, currentCheckpoint, 1);
+            } else {
+                isCheckpointing.set(false);
+                checkpointExecutor.schedule(this::triggerCheckpoint, 
checkpointInterval, TimeUnit.MILLISECONDS);
+            }
+        }
     }
 
     @Override
     public void processRecords(ProcessRecordsInput processRecordsInput) {
+        this.checkpointerRef.set(processRecordsInput.checkpointer());
         log.info("Processing {} records from {}", 
processRecordsInput.records().size(), kinesisShardId);
         long millisBehindLatest = processRecordsInput.millisBehindLatest();
 
         for (KinesisClientRecord record : processRecordsInput.records()) {
             try {
+                log.debug("Add record with sequence number {}:{} to queue for 
shard {}.",
+                        record.sequenceNumber(), record.subSequenceNumber(), 
kinesisShardId);
+                numRecordsInFlight.incrementAndGet();

Review Comment:
   Is it better to move this line after the following catch block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to