AHeise commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829847948
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -162,19 +191,21 @@ protected void advanceToEndOfEventTime() {
// --------------------------
private void triggerCheckpointForExternallyInducedSource(long
checkpointId) {
- final CheckpointOptions checkpointOptions =
- CheckpointOptions.forConfig(
- CheckpointType.CHECKPOINT,
- CheckpointStorageLocationReference.getDefault(),
- configuration.isExactlyOnceCheckpointMode(),
- configuration.isUnalignedCheckpointsEnabled(),
-
configuration.getAlignedCheckpointTimeout().toMillis());
- final long timestamp = System.currentTimeMillis();
-
- final CheckpointMetaData checkpointMetaData =
- new CheckpointMetaData(checkpointId, timestamp, timestamp);
-
- super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
+ // cleanup any old checkpoint that was cancelled before trigger
+ untriggeredCheckpoints.headMap(checkpointId).clear();
+ UntriggeredCheckpoint untriggeredCheckpoint =
untriggeredCheckpoints.remove(checkpointId);
+ if (untriggeredCheckpoint != null) {
+ // common case: RPC before external sources induces it
+ super.triggerCheckpointAsync(
+ untriggeredCheckpoint.getMetadata(),
+ untriggeredCheckpoint.getCheckpointOptions());
+ } else {
+ // rare case: external source induced first
+ // note that at this point, we should probably not emit more data
such that data is
+ // properly aligned
+ // however, unless we receive a reliable checkpoint abort RPC,
this may deadlock
Review comment:
We should probably discuss if this is the best choice.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]