RobertIndie commented on code in PR #24534: URL: https://github.com/apache/pulsar/pull/24534#discussion_r2218748567
########## pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessor.java: ########## @@ -39,120 +44,178 @@ @Slf4j public class KinesisRecordProcessor implements ShardRecordProcessor { + private record CheckpointSequenceNumber(String sequenceNumber, long subSequenceNumber) {} + private final int numRetries; private final long checkpointInterval; private final long backoffTime; - private final LinkedBlockingQueue<KinesisRecord> queue; private final SourceContext sourceContext; private final Set<String> propertiesToInclude; - - private long nextCheckpointTimeInNanos; + private final ScheduledExecutorService checkpointExecutor; + private final AtomicReference<RecordProcessorCheckpointer> checkpointerRef = new AtomicReference<>(); + private final AtomicBoolean isCheckpointing = new AtomicBoolean(false); private String kinesisShardId; - private volatile String sequenceNumberToCheckpoint = null; - private String lastCheckpointedSequenceNumber = null; + private final AtomicInteger numRecordsInFlight = new AtomicInteger(0); + + private volatile CheckpointSequenceNumber sequenceNumberNeedToCheckpoint = null; + private volatile CheckpointSequenceNumber lastCheckpointSequenceNumber = null; public KinesisRecordProcessor(LinkedBlockingQueue<KinesisRecord> queue, KinesisSourceConfig config, - SourceContext sourceContext) { + SourceContext sourceContext, ScheduledExecutorService checkpointExecutor) { this.queue = queue; this.checkpointInterval = config.getCheckpointInterval(); this.numRetries = config.getNumRetries(); this.backoffTime = config.getBackoffTime(); this.propertiesToInclude = config.getPropertiesToInclude(); this.sourceContext = sourceContext; + this.checkpointExecutor = checkpointExecutor; } - private void checkpoint(RecordProcessorCheckpointer checkpointer, String sequenceNumber) { - log.info("Checkpointing shard {} at sequence number {}", kinesisShardId, sequenceNumber); - for (int i = 0; i < numRetries; i++) { - try { - checkpointer.checkpoint(sequenceNumber); - lastCheckpointedSequenceNumber = sequenceNumber; - break; - } catch (ShutdownException se) { - log.info("Caught shutdown exception, skipping checkpoint.", se); - sourceContext.fatal(se); - break; - } catch (InvalidStateException e) { - log.error("Cannot save checkpoint to the DynamoDB table.", e); + private void tryCheckpointWithRetry(RecordProcessorCheckpointer checkpointer, + CheckpointSequenceNumber checkpoint, int attempt) { + try { + log.info("Attempting checkpoint {}/{} for shard {} at {}. In-flight records: {}", + attempt, numRetries, kinesisShardId, checkpoint, numRecordsInFlight.get()); + checkpointer.checkpoint(checkpoint.sequenceNumber(), checkpoint.subSequenceNumber()); + lastCheckpointSequenceNumber = checkpoint; + log.info("Successfully checkpointed shard {} at {}", kinesisShardId, checkpoint); + isCheckpointing.set(false); + checkpointExecutor.schedule(this::triggerCheckpoint, checkpointInterval, TimeUnit.MILLISECONDS); + } catch (ShutdownException | InvalidStateException e) { + log.error("Caught a non-retryable exception for shard {} during checkpoint at {}. Terminating.", + kinesisShardId, checkpoint, e); + sourceContext.fatal(e); + } catch (ThrottlingException | KinesisClientLibDependencyException e) { + if (attempt >= numRetries) { + log.error("Checkpoint for shard {} failed after {} attempts at {}. Terminating.", + kinesisShardId, numRetries, checkpoint, e); sourceContext.fatal(e); - break; - } catch (ThrottlingException | KinesisClientLibDependencyException e) { - if (i >= (numRetries - 1)) { - log.error("Checkpoint failed after {} attempts.", (i + 1), e); - sourceContext.fatal(e); - break; - } - } - - try { - Thread.sleep(backoffTime); - } catch (InterruptedException e) { - log.debug("Interrupted sleep", e); + } else { + log.warn("Throttling/Dependency error on checkpoint for shard {} at {}. Scheduling retry {} " + + "after {}ms.", kinesisShardId, checkpoint, attempt + 1, backoffTime); + checkpointExecutor.schedule(() -> tryCheckpointWithRetry(checkpointer, checkpoint, attempt + 1), + backoffTime, TimeUnit.MILLISECONDS); } } } - public void updateSequenceNumberToCheckpoint(String sequenceNumber) { - this.sequenceNumberToCheckpoint = sequenceNumber; + public void updateSequenceNumberToCheckpoint(String sequenceNumber, long subSequenceNumber) { + CheckpointSequenceNumber newCheckpoint = new CheckpointSequenceNumber(sequenceNumber, subSequenceNumber); + log.debug("{} Updating sequence number to checkpoint {}", kinesisShardId, newCheckpoint); + this.sequenceNumberNeedToCheckpoint = newCheckpoint; + this.numRecordsInFlight.decrementAndGet(); } public void failed() { + numRecordsInFlight.decrementAndGet(); sourceContext.fatal(new PulsarClientException("Failed to process Kinesis records due send to pulsar topic")); } @Override public void initialize(InitializationInput initializationInput) { kinesisShardId = initializationInput.shardId(); - log.info("Initializing KinesisRecordProcessor for shard {}. Config: checkpointInterval={}ms, numRetries={}, " - + "backoffTime={}ms, propertiesToInclude={}", - kinesisShardId, checkpointInterval, numRetries, backoffTime, propertiesToInclude); - nextCheckpointTimeInNanos = System.nanoTime() + checkpointInterval; + log.info("Initializing KinesisRecordProcessor for shard {}, extendedSequenceNumber: {}, pendingCheckSeq: {}", + kinesisShardId, initializationInput.extendedSequenceNumber(), + initializationInput.pendingCheckpointSequenceNumber()); + checkpointExecutor.schedule(this::triggerCheckpoint, checkpointInterval, TimeUnit.MILLISECONDS); + } + + private void triggerCheckpoint() { + if (isCheckpointing.compareAndSet(false, true)) { + final RecordProcessorCheckpointer checkpointer = checkpointerRef.get(); + final CheckpointSequenceNumber currentCheckpoint = this.sequenceNumberNeedToCheckpoint; + + if (checkpointer != null && currentCheckpoint != null && !currentCheckpoint.equals( + lastCheckpointSequenceNumber)) { + tryCheckpointWithRetry(checkpointer, currentCheckpoint, 1); + } else { + isCheckpointing.set(false); + checkpointExecutor.schedule(this::triggerCheckpoint, checkpointInterval, TimeUnit.MILLISECONDS); + } + } Review Comment: We should add a try-catch for this method. Or inside the tryCheckpointWithRetry to catch any runtime exceptions/throwable like NPE. Because if there is an exception thrown in the `ScheduledExecutorService` thread, it won't print any logs and it's hard to debug. ########## pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessor.java: ########## @@ -39,120 +44,178 @@ @Slf4j public class KinesisRecordProcessor implements ShardRecordProcessor { + private record CheckpointSequenceNumber(String sequenceNumber, long subSequenceNumber) {} + private final int numRetries; private final long checkpointInterval; private final long backoffTime; - private final LinkedBlockingQueue<KinesisRecord> queue; private final SourceContext sourceContext; private final Set<String> propertiesToInclude; - - private long nextCheckpointTimeInNanos; + private final ScheduledExecutorService checkpointExecutor; + private final AtomicReference<RecordProcessorCheckpointer> checkpointerRef = new AtomicReference<>(); + private final AtomicBoolean isCheckpointing = new AtomicBoolean(false); private String kinesisShardId; - private volatile String sequenceNumberToCheckpoint = null; - private String lastCheckpointedSequenceNumber = null; + private final AtomicInteger numRecordsInFlight = new AtomicInteger(0); + + private volatile CheckpointSequenceNumber sequenceNumberNeedToCheckpoint = null; + private volatile CheckpointSequenceNumber lastCheckpointSequenceNumber = null; public KinesisRecordProcessor(LinkedBlockingQueue<KinesisRecord> queue, KinesisSourceConfig config, - SourceContext sourceContext) { + SourceContext sourceContext, ScheduledExecutorService checkpointExecutor) { this.queue = queue; this.checkpointInterval = config.getCheckpointInterval(); this.numRetries = config.getNumRetries(); this.backoffTime = config.getBackoffTime(); this.propertiesToInclude = config.getPropertiesToInclude(); this.sourceContext = sourceContext; + this.checkpointExecutor = checkpointExecutor; } - private void checkpoint(RecordProcessorCheckpointer checkpointer, String sequenceNumber) { - log.info("Checkpointing shard {} at sequence number {}", kinesisShardId, sequenceNumber); - for (int i = 0; i < numRetries; i++) { - try { - checkpointer.checkpoint(sequenceNumber); - lastCheckpointedSequenceNumber = sequenceNumber; - break; - } catch (ShutdownException se) { - log.info("Caught shutdown exception, skipping checkpoint.", se); - sourceContext.fatal(se); - break; - } catch (InvalidStateException e) { - log.error("Cannot save checkpoint to the DynamoDB table.", e); + private void tryCheckpointWithRetry(RecordProcessorCheckpointer checkpointer, + CheckpointSequenceNumber checkpoint, int attempt) { + try { + log.info("Attempting checkpoint {}/{} for shard {} at {}. In-flight records: {}", + attempt, numRetries, kinesisShardId, checkpoint, numRecordsInFlight.get()); + checkpointer.checkpoint(checkpoint.sequenceNumber(), checkpoint.subSequenceNumber()); + lastCheckpointSequenceNumber = checkpoint; + log.info("Successfully checkpointed shard {} at {}", kinesisShardId, checkpoint); + isCheckpointing.set(false); + checkpointExecutor.schedule(this::triggerCheckpoint, checkpointInterval, TimeUnit.MILLISECONDS); + } catch (ShutdownException | InvalidStateException e) { + log.error("Caught a non-retryable exception for shard {} during checkpoint at {}. Terminating.", + kinesisShardId, checkpoint, e); + sourceContext.fatal(e); + } catch (ThrottlingException | KinesisClientLibDependencyException e) { + if (attempt >= numRetries) { + log.error("Checkpoint for shard {} failed after {} attempts at {}. Terminating.", + kinesisShardId, numRetries, checkpoint, e); sourceContext.fatal(e); - break; - } catch (ThrottlingException | KinesisClientLibDependencyException e) { - if (i >= (numRetries - 1)) { - log.error("Checkpoint failed after {} attempts.", (i + 1), e); - sourceContext.fatal(e); - break; - } - } - - try { - Thread.sleep(backoffTime); - } catch (InterruptedException e) { - log.debug("Interrupted sleep", e); + } else { + log.warn("Throttling/Dependency error on checkpoint for shard {} at {}. Scheduling retry {} " + + "after {}ms.", kinesisShardId, checkpoint, attempt + 1, backoffTime); + checkpointExecutor.schedule(() -> tryCheckpointWithRetry(checkpointer, checkpoint, attempt + 1), + backoffTime, TimeUnit.MILLISECONDS); } } } - public void updateSequenceNumberToCheckpoint(String sequenceNumber) { - this.sequenceNumberToCheckpoint = sequenceNumber; + public void updateSequenceNumberToCheckpoint(String sequenceNumber, long subSequenceNumber) { + CheckpointSequenceNumber newCheckpoint = new CheckpointSequenceNumber(sequenceNumber, subSequenceNumber); + log.debug("{} Updating sequence number to checkpoint {}", kinesisShardId, newCheckpoint); + this.sequenceNumberNeedToCheckpoint = newCheckpoint; + this.numRecordsInFlight.decrementAndGet(); } public void failed() { + numRecordsInFlight.decrementAndGet(); sourceContext.fatal(new PulsarClientException("Failed to process Kinesis records due send to pulsar topic")); } @Override public void initialize(InitializationInput initializationInput) { kinesisShardId = initializationInput.shardId(); - log.info("Initializing KinesisRecordProcessor for shard {}. Config: checkpointInterval={}ms, numRetries={}, " - + "backoffTime={}ms, propertiesToInclude={}", - kinesisShardId, checkpointInterval, numRetries, backoffTime, propertiesToInclude); - nextCheckpointTimeInNanos = System.nanoTime() + checkpointInterval; + log.info("Initializing KinesisRecordProcessor for shard {}, extendedSequenceNumber: {}, pendingCheckSeq: {}", + kinesisShardId, initializationInput.extendedSequenceNumber(), + initializationInput.pendingCheckpointSequenceNumber()); + checkpointExecutor.schedule(this::triggerCheckpoint, checkpointInterval, TimeUnit.MILLISECONDS); + } + + private void triggerCheckpoint() { + if (isCheckpointing.compareAndSet(false, true)) { + final RecordProcessorCheckpointer checkpointer = checkpointerRef.get(); + final CheckpointSequenceNumber currentCheckpoint = this.sequenceNumberNeedToCheckpoint; + + if (checkpointer != null && currentCheckpoint != null && !currentCheckpoint.equals( + lastCheckpointSequenceNumber)) { + tryCheckpointWithRetry(checkpointer, currentCheckpoint, 1); + } else { + isCheckpointing.set(false); + checkpointExecutor.schedule(this::triggerCheckpoint, checkpointInterval, TimeUnit.MILLISECONDS); + } + } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { + this.checkpointerRef.set(processRecordsInput.checkpointer()); log.info("Processing {} records from {}", processRecordsInput.records().size(), kinesisShardId); long millisBehindLatest = processRecordsInput.millisBehindLatest(); for (KinesisClientRecord record : processRecordsInput.records()) { try { + log.debug("Add record with sequence number {}:{} to queue for shard {}.", + record.sequenceNumber(), record.subSequenceNumber(), kinesisShardId); + numRecordsInFlight.incrementAndGet(); Review Comment: Is it better to move this line after the following catch block? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org