This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5ce2e062cdb1c7dfd7e641cd7f10de04440a0583 Author: fanrui <[email protected]> AuthorDate: Mon Feb 28 11:35:09 2022 +0800 [FLINK-26049][checkpoint] initialize CheckpointLocation after create PendingCheckpoint --- .../runtime/checkpoint/CheckpointCoordinator.java | 86 +++++++++++----------- .../runtime/checkpoint/PendingCheckpoint.java | 16 ++-- .../checkpoint/CheckpointCoordinatorTest.java | 10 ++- .../runtime/checkpoint/PendingCheckpointTest.java | 24 +++--- 4 files changed, 75 insertions(+), 61 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 475effc..4efff80 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -545,14 +545,12 @@ public class CheckpointCoordinator { .thenApplyAsync( plan -> { try { - CheckpointIdAndStorageLocation - checkpointIdAndStorageLocation = - initializeCheckpoint( - request.props, - request.externalSavepointLocation, - initializeBaseLocations); - return new Tuple2<>( - plan, checkpointIdAndStorageLocation); + // this must happen outside the coordinator-wide lock, + // because it communicates with external services + // (in HA mode) and may block for a while. + long checkpointID = + checkpointIdCounter.getAndIncrement(); + return new Tuple2<>(plan, checkpointID); } catch (Throwable e) { throw new CompletionException(e); } @@ -565,20 +563,42 @@ public class CheckpointCoordinator { request.props, checkpointInfo.f0, request.isPeriodic, - checkpointInfo.f1.checkpointId, - checkpointInfo.f1.checkpointStorageLocation, + checkpointInfo.f1, request.getOnCompletionFuture()), timer); final CompletableFuture<?> coordinatorCheckpointsComplete = - pendingCheckpointCompletableFuture.thenComposeAsync( - (pendingCheckpoint) -> - OperatorCoordinatorCheckpoints - .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion( - coordinatorsToCheckpoint, - pendingCheckpoint, - timer), - timer); + pendingCheckpointCompletableFuture + .thenApplyAsync( + pendingCheckpoint -> { + try { + CheckpointStorageLocation checkpointStorageLocation = + initializeCheckpointLocation( + pendingCheckpoint.getCheckpointID(), + request.props, + request.externalSavepointLocation, + initializeBaseLocations); + return Tuple2.of( + pendingCheckpoint, checkpointStorageLocation); + } catch (Throwable e) { + throw new CompletionException(e); + } + }, + executor) + .thenComposeAsync( + (checkpointInfo) -> { + PendingCheckpoint pendingCheckpoint = checkpointInfo.f0; + synchronized (lock) { + pendingCheckpoint.setCheckpointTargetLocation( + checkpointInfo.f1); + } + return OperatorCoordinatorCheckpoints + .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion( + coordinatorsToCheckpoint, + pendingCheckpoint, + timer); + }, + timer); // We have to take the snapshot of the master hooks after the coordinator checkpoints // has completed. @@ -726,24 +746,20 @@ public class CheckpointCoordinator { } /** - * Initialize the checkpoint trigger asynchronously. It will expected to be executed in io + * Initialize the checkpoint location asynchronously. It will expected to be executed in io * thread due to it might be time-consuming. * + * @param checkpointID checkpoint id * @param props checkpoint properties * @param externalSavepointLocation the external savepoint location, it might be null - * @return the initialized result, checkpoint id and checkpoint location + * @return the checkpoint location */ - private CheckpointIdAndStorageLocation initializeCheckpoint( + private CheckpointStorageLocation initializeCheckpointLocation( + long checkpointID, CheckpointProperties props, @Nullable String externalSavepointLocation, boolean initializeBaseLocations) throws Exception { - - // this must happen outside the coordinator-wide lock, because it - // communicates - // with external services (in HA mode) and may block for a while. - long checkpointID = checkpointIdCounter.getAndIncrement(); - final CheckpointStorageLocation checkpointStorageLocation; if (props.isSavepoint()) { checkpointStorageLocation = @@ -757,7 +773,7 @@ public class CheckpointCoordinator { checkpointStorageView.initializeLocationForCheckpoint(checkpointID); } - return new CheckpointIdAndStorageLocation(checkpointID, checkpointStorageLocation); + return checkpointStorageLocation; } private PendingCheckpoint createPendingCheckpoint( @@ -766,7 +782,6 @@ public class CheckpointCoordinator { CheckpointPlan checkpointPlan, boolean isPeriodic, long checkpointID, - CheckpointStorageLocation checkpointStorageLocation, CompletableFuture<CompletedCheckpoint> onCompletionPromise) { synchronized (lock) { @@ -791,7 +806,6 @@ public class CheckpointCoordinator { OperatorInfo.getIds(coordinatorsToCheckpoint), masterHooks.keySet(), props, - checkpointStorageLocation, onCompletionPromise, pendingCheckpointStats); @@ -2161,18 +2175,6 @@ public class CheckpointCoordinator { } } - private static class CheckpointIdAndStorageLocation { - private final long checkpointId; - private final CheckpointStorageLocation checkpointStorageLocation; - - CheckpointIdAndStorageLocation( - long checkpointId, CheckpointStorageLocation checkpointStorageLocation) { - - this.checkpointId = checkpointId; - this.checkpointStorageLocation = checkNotNull(checkpointStorageLocation); - } - } - static class CheckpointTriggerRequest { final long timestamp; final CheckpointProperties props; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 08cd23f..9229024 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -107,14 +107,14 @@ public class PendingCheckpoint implements Checkpoint { /** The checkpoint properties. */ private final CheckpointProperties props; - /** Target storage location to persist the checkpoint metadata to. */ - private final CheckpointStorageLocation targetLocation; - /** The promise to fulfill once the checkpoint has been completed. */ private final CompletableFuture<CompletedCheckpoint> onCompletionPromise; @Nullable private final PendingCheckpointStats pendingCheckpointStats; + /** Target storage location to persist the checkpoint metadata to. */ + @Nullable private CheckpointStorageLocation targetLocation; + private int numAcknowledgedTasks; private boolean disposed; @@ -135,7 +135,6 @@ public class PendingCheckpoint implements Checkpoint { Collection<OperatorID> operatorCoordinatorsToConfirm, Collection<String> masterStateIdentifiers, CheckpointProperties props, - CheckpointStorageLocation targetLocation, CompletableFuture<CompletedCheckpoint> onCompletionPromise, @Nullable PendingCheckpointStats pendingCheckpointStats) { checkArgument( @@ -153,7 +152,6 @@ public class PendingCheckpoint implements Checkpoint { } this.props = checkNotNull(props); - this.targetLocation = checkNotNull(targetLocation); this.operatorStates = new HashMap<>(); this.masterStates = new ArrayList<>(masterStateIdentifiers.size()); @@ -191,6 +189,10 @@ public class PendingCheckpoint implements Checkpoint { return checkpointId; } + public void setCheckpointTargetLocation(CheckpointStorageLocation targetLocation) { + this.targetLocation = targetLocation; + } + public CheckpointStorageLocation getCheckpointStorageLocation() { return targetLocation; } @@ -645,7 +647,9 @@ public class PendingCheckpoint implements Checkpoint { // unregistered shared states are still considered private at this point. try { StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values()); - targetLocation.disposeOnFailure(); + if (targetLocation != null) { + targetLocation.disposeOnFailure(); + } } catch (Throwable t) { LOG.warn( "Could not properly dispose the private states in the pending checkpoint {} of job {}.", diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 3c09945..b493ef4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -727,11 +727,14 @@ public class CheckpointCoordinatorTest extends TestLogger { /** Tests that do not trigger checkpoint when IOException occurred. */ @Test - public void testTriggerCheckpointAfterIOException() throws Exception { - // given: Checkpoint coordinator which fails on initializeLocationForCheckpoint. + public void testTriggerCheckpointAfterCheckpointStorageIOException() throws Exception { + // given: Checkpoint coordinator which fails on initializeCheckpointLocation. TestFailJobCallback failureCallback = new TestFailJobCallback(); + CheckpointStatsTracker statsTracker = + new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup()); CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder() + .setCheckpointStatsTracker(statsTracker) .setFailureManager(new CheckpointFailureManager(0, failureCallback)) .setCheckpointStorage(new IOExceptionCheckpointStorage()) .setTimer(manuallyTriggeredScheduledExecutor) @@ -741,6 +744,9 @@ public class CheckpointCoordinatorTest extends TestLogger { // then: Failure manager should fail the job. assertEquals(1, failureCallback.getInvokeCounter()); + + // then: Should created PendingCheckpoint + assertNotNull(statsTracker.getPendingCheckpointStats(1)); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 799d743..a0b07b0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -634,17 +634,19 @@ public class PendingCheckpointTest { 1024, 4096); - return new PendingCheckpoint( - new JobID(), - 0, - 1, - checkpointPlan, - operatorCoordinators, - masterStateIdentifiers, - props, - location, - new CompletableFuture<>(), - null); + PendingCheckpoint pendingCheckpoint = + new PendingCheckpoint( + new JobID(), + 0, + 1, + checkpointPlan, + operatorCoordinators, + masterStateIdentifiers, + props, + new CompletableFuture<>(), + null); + pendingCheckpoint.setCheckpointTargetLocation(location); + return pendingCheckpoint; } @SuppressWarnings("unchecked")
