pnowojski commented on a change in pull request #18852:
URL: https://github.com/apache/flink/pull/18852#discussion_r817861949
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
##########
@@ -105,12 +105,12 @@
/** The checkpoint properties. */
private final CheckpointProperties props;
- /** Target storage location to persist the checkpoint metadata to. */
- private final CheckpointStorageLocation targetLocation;
-
/** The promise to fulfill once the checkpoint has been completed. */
private final CompletableFuture<CompletedCheckpoint> onCompletionPromise;
+ /** Target storage location to persist the checkpoint metadata to. */
+ private CheckpointStorageLocation targetLocation;
Review comment:
nit: `@Nullable`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -545,14 +545,12 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
.thenApplyAsync(
plan -> {
try {
- CheckpointIdAndStorageLocation
-
checkpointIdAndStorageLocation =
-
initializeCheckpoint(
-
request.props,
-
request.externalSavepointLocation,
-
initializeBaseLocations);
- return new Tuple2<>(
- plan,
checkpointIdAndStorageLocation);
Review comment:
Also I have some small doubts that we are splitting one async IO action
into two smaller ones, which can cause some extra overheads. As I understand
the reason is that you want to have an existing pending checkpoint when doing
error handling of `initializeCheckpointLocation`, right? Wouldn't an alternate
solution to this problem be to have somewhere in the error handling logic, if
we are handling an error for a checkpoint that's missing `PendingCheckpoint`,
we lazily/on-demand create one? Have you considered this approach and what
pros/cons would it have?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -565,20 +563,46 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
request.props,
checkpointInfo.f0,
request.isPeriodic,
-
checkpointInfo.f1.checkpointId,
-
checkpointInfo.f1.checkpointStorageLocation,
+ checkpointInfo.f1,
request.getOnCompletionFuture()),
timer);
final CompletableFuture<?> coordinatorCheckpointsComplete =
- pendingCheckpointCompletableFuture.thenComposeAsync(
- (pendingCheckpoint) ->
- OperatorCoordinatorCheckpoints
-
.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
- coordinatorsToCheckpoint,
- pendingCheckpoint,
- timer),
- timer);
+ pendingCheckpointCompletableFuture
+ .thenApplyAsync(
+ pendingCheckpoint -> {
+ try {
+ CheckpointStorageLocation
checkpointStorageLocation =
+
initializeCheckpointLocation(
+
pendingCheckpoint.getCheckpointID(),
+ request.props,
+
request.externalSavepointLocation,
+
initializeBaseLocations);
+ return Tuple2.of(
+ pendingCheckpoint,
checkpointStorageLocation);
+ } catch (Throwable e) {
+ throw new CompletionException(e);
+ }
+ },
+ executor)
+ .thenApplyAsync(
+ (checkpointInfo) -> {
+ PendingCheckpoint pendingCheckpoint =
checkpointInfo.f0;
+ synchronized (lock) {
+
pendingCheckpoint.setCheckpointTargetLocation(
+ checkpointInfo.f1);
+ }
+ return pendingCheckpoint;
+ },
+ timer)
+ .thenComposeAsync(
+ (pendingCheckpoint) ->
+ OperatorCoordinatorCheckpoints
+
.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
+
coordinatorsToCheckpoint,
+ pendingCheckpoint,
+ timer),
+ timer);
Review comment:
Why do we need those two lambda functions to be separate? Both of them
are non blocking happening in the `timer` thread, so why can not we squash them?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -545,14 +545,12 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
.thenApplyAsync(
plan -> {
try {
- CheckpointIdAndStorageLocation
-
checkpointIdAndStorageLocation =
-
initializeCheckpoint(
-
request.props,
-
request.externalSavepointLocation,
-
initializeBaseLocations);
- return new Tuple2<>(
- plan,
checkpointIdAndStorageLocation);
Review comment:
initializing the storage location was moved after creating pending
checkpoint due to error handling reasons, right? If so, why leaving
`checkpointIdCounter.getAndIncrement()` here doesn't cause the same problems?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]