This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 970932bfe207617be45568ea6ad5c6357a412fc7 Author: fanrui <[email protected]> AuthorDate: Mon Feb 28 11:35:09 2022 +0800 [FLINK-26049][checkpoint] initialize CheckpointLocation after create PendingCheckpoint --- .../runtime/checkpoint/CheckpointCoordinator.java | 95 +++++++++++----------- .../runtime/checkpoint/PendingCheckpoint.java | 16 ++-- .../checkpoint/CheckpointCoordinatorTest.java | 14 +++- .../CheckpointCoordinatorTestingUtils.java | 45 ++++++---- .../runtime/checkpoint/PendingCheckpointTest.java | 22 ++--- 5 files changed, 113 insertions(+), 79 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index a2bff9d..384e047 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -541,13 +541,12 @@ public class CheckpointCoordinator { .thenApplyAsync( plan -> { try { - CheckpointIdAndStorageLocation - checkpointIdAndStorageLocation = - initializeCheckpoint( - request.props, - request.externalSavepointLocation); - return new Tuple2<>( - plan, checkpointIdAndStorageLocation); + // this must happen outside the coordinator-wide lock, + // because it communicates with external services + // (in HA mode) and may block for a while. + long checkpointID = + checkpointIdCounter.getAndIncrement(); + return new Tuple2<>(plan, checkpointID); } catch (Throwable e) { throw new CompletionException(e); } @@ -560,20 +559,41 @@ public class CheckpointCoordinator { request.props, checkpointInfo.f0, request.isPeriodic, - checkpointInfo.f1.checkpointId, - checkpointInfo.f1.checkpointStorageLocation, + checkpointInfo.f1, request.getOnCompletionFuture()), timer); final CompletableFuture<?> coordinatorCheckpointsComplete = - pendingCheckpointCompletableFuture.thenComposeAsync( - (pendingCheckpoint) -> - OperatorCoordinatorCheckpoints - .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion( - coordinatorsToCheckpoint, - pendingCheckpoint, - timer), - timer); + pendingCheckpointCompletableFuture + .thenApplyAsync( + pendingCheckpoint -> { + try { + CheckpointStorageLocation checkpointStorageLocation = + initializeCheckpointLocation( + pendingCheckpoint.getCheckpointID(), + request.props, + request.externalSavepointLocation); + return Tuple2.of( + pendingCheckpoint, checkpointStorageLocation); + } catch (Throwable e) { + throw new CompletionException(e); + } + }, + executor) + .thenComposeAsync( + (checkpointInfo) -> { + PendingCheckpoint pendingCheckpoint = checkpointInfo.f0; + synchronized (lock) { + pendingCheckpoint.setCheckpointTargetLocation( + checkpointInfo.f1); + } + return OperatorCoordinatorCheckpoints + .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion( + coordinatorsToCheckpoint, + pendingCheckpoint, + timer); + }, + timer); // We have to take the snapshot of the master hooks after the coordinator checkpoints // has completed. @@ -714,29 +734,24 @@ public class CheckpointCoordinator { } /** - * Initialize the checkpoint trigger asynchronously. It will expected to be executed in io + * Initialize the checkpoint location asynchronously. It will expected to be executed in io * thread due to it might be time-consuming. * + * @param checkpointID checkpoint id * @param props checkpoint properties * @param externalSavepointLocation the external savepoint location, it might be null - * @return the initialized result, checkpoint id and checkpoint location + * @return the checkpoint location */ - private CheckpointIdAndStorageLocation initializeCheckpoint( - CheckpointProperties props, @Nullable String externalSavepointLocation) + private CheckpointStorageLocation initializeCheckpointLocation( + long checkpointID, + CheckpointProperties props, + @Nullable String externalSavepointLocation) throws Exception { - // this must happen outside the coordinator-wide lock, because it - // communicates - // with external services (in HA mode) and may block for a while. - long checkpointID = checkpointIdCounter.getAndIncrement(); - - CheckpointStorageLocation checkpointStorageLocation = - props.isSavepoint() - ? checkpointStorageView.initializeLocationForSavepoint( - checkpointID, externalSavepointLocation) - : checkpointStorageView.initializeLocationForCheckpoint(checkpointID); - - return new CheckpointIdAndStorageLocation(checkpointID, checkpointStorageLocation); + return props.isSavepoint() + ? checkpointStorageView.initializeLocationForSavepoint( + checkpointID, externalSavepointLocation) + : checkpointStorageView.initializeLocationForCheckpoint(checkpointID); } private PendingCheckpoint createPendingCheckpoint( @@ -745,7 +760,6 @@ public class CheckpointCoordinator { CheckpointPlan checkpointPlan, boolean isPeriodic, long checkpointID, - CheckpointStorageLocation checkpointStorageLocation, CompletableFuture<CompletedCheckpoint> onCompletionPromise) { synchronized (lock) { @@ -767,7 +781,6 @@ public class CheckpointCoordinator { OperatorInfo.getIds(coordinatorsToCheckpoint), masterHooks.keySet(), props, - checkpointStorageLocation, onCompletionPromise); trackPendingCheckpointStats(checkpoint); @@ -2025,18 +2038,6 @@ public class CheckpointCoordinator { } } - private static class CheckpointIdAndStorageLocation { - private final long checkpointId; - private final CheckpointStorageLocation checkpointStorageLocation; - - CheckpointIdAndStorageLocation( - long checkpointId, CheckpointStorageLocation checkpointStorageLocation) { - - this.checkpointId = checkpointId; - this.checkpointStorageLocation = checkNotNull(checkpointStorageLocation); - } - } - static class CheckpointTriggerRequest { final long timestamp; final CheckpointProperties props; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index f64af16..47e7a32 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -105,12 +105,12 @@ public class PendingCheckpoint implements Checkpoint { /** The checkpoint properties. */ private final CheckpointProperties props; - /** Target storage location to persist the checkpoint metadata to. */ - private final CheckpointStorageLocation targetLocation; - /** The promise to fulfill once the checkpoint has been completed. */ private final CompletableFuture<CompletedCheckpoint> onCompletionPromise; + /** Target storage location to persist the checkpoint metadata to. */ + @Nullable private CheckpointStorageLocation targetLocation; + private int numAcknowledgedTasks; private boolean disposed; @@ -131,7 +131,6 @@ public class PendingCheckpoint implements Checkpoint { Collection<OperatorID> operatorCoordinatorsToConfirm, Collection<String> masterStateIdentifiers, CheckpointProperties props, - CheckpointStorageLocation targetLocation, CompletableFuture<CompletedCheckpoint> onCompletionPromise) { checkArgument( @@ -149,7 +148,6 @@ public class PendingCheckpoint implements Checkpoint { } this.props = checkNotNull(props); - this.targetLocation = checkNotNull(targetLocation); this.operatorStates = new HashMap<>(); this.masterStates = new ArrayList<>(masterStateIdentifiers.size()); @@ -186,6 +184,10 @@ public class PendingCheckpoint implements Checkpoint { return checkpointId; } + public void setCheckpointTargetLocation(CheckpointStorageLocation targetLocation) { + this.targetLocation = targetLocation; + } + public CheckpointStorageLocation getCheckpointStorageLocation() { return targetLocation; } @@ -599,7 +601,9 @@ public class PendingCheckpoint implements Checkpoint { // unregistered shared states are still considered private at this point. try { StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values()); - targetLocation.disposeOnFailure(); + if (targetLocation != null) { + targetLocation.disposeOnFailure(); + } } catch (Throwable t) { LOG.warn( "Could not properly dispose the private states in the pending checkpoint {} of job {}.", diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 8c716db..f6f50b9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -671,20 +671,30 @@ public class CheckpointCoordinatorTest extends TestLogger { /** Tests that do not trigger checkpoint when IOException occurred. */ @Test - public void testTriggerCheckpointAfterIOException() throws Exception { - // given: Checkpoint coordinator which fails on initializeLocationForCheckpoint. + public void testTriggerCheckpointAfterCheckpointStorageIOException() throws Exception { + // given: Checkpoint coordinator which fails on initializeCheckpointLocation. TestFailJobCallback failureCallback = new TestFailJobCallback(); + CheckpointStatsTracker statsTracker = + new CheckpointStatsTracker( + Integer.MAX_VALUE, + mock(CheckpointCoordinatorConfiguration.class), + new UnregisteredMetricsGroup()); CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder() .setFailureManager(new CheckpointFailureManager(0, failureCallback)) .setCheckpointStorage(new IOExceptionCheckpointStorage()) .setTimer(manuallyTriggeredScheduledExecutor) + .setCheckpointStatsTracker(statsTracker) .build(); + // when: The checkpoint is triggered. testTriggerCheckpoint(checkpointCoordinator, IO_EXCEPTION); // then: Failure manager should fail the job. assertEquals(1, failureCallback.getInvokeCounter()); + + // then: Should created PendingCheckpoint + assertNotNull(statsTracker.getPendingCheckpointStats(1)); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java index a178a4c..4ba15fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.WatermarkStrategyTest; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.Path; @@ -724,6 +725,12 @@ public class CheckpointCoordinatorTestingUtils { private boolean allowCheckpointsAfterTasksFinished; + private CheckpointStatsTracker checkpointStatsTracker = + new CheckpointStatsTracker( + 1, + CheckpointCoordinatorConfiguration.builder().build(), + new WatermarkStrategyTest.DummyMetricGroup()); + public CheckpointCoordinatorBuilder setCheckpointCoordinatorConfiguration( CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration) { this.checkpointCoordinatorConfiguration = checkpointCoordinatorConfiguration; @@ -798,6 +805,12 @@ public class CheckpointCoordinatorTestingUtils { return this; } + public CheckpointCoordinatorBuilder setCheckpointStatsTracker( + CheckpointStatsTracker checkpointStatsTracker) { + this.checkpointStatsTracker = checkpointStatsTracker; + return this; + } + public CheckpointCoordinator build() throws Exception { if (executionGraph == null) { executionGraph = @@ -813,20 +826,24 @@ public class CheckpointCoordinatorTestingUtils { executionGraph.getVerticesTopologically(), allowCheckpointsAfterTasksFinished); - return new CheckpointCoordinator( - executionGraph.getJobID(), - checkpointCoordinatorConfiguration, - coordinatorsToCheckpoint, - checkpointIDCounter, - completedCheckpointStore, - checkpointStorage, - ioExecutor, - checkpointsCleaner, - timer, - sharedStateRegistryFactory, - failureManager, - checkpointPlanCalculator, - new ExecutionAttemptMappingProvider(executionGraph.getAllExecutionVertices())); + CheckpointCoordinator checkpointCoordinator = + new CheckpointCoordinator( + executionGraph.getJobID(), + checkpointCoordinatorConfiguration, + coordinatorsToCheckpoint, + checkpointIDCounter, + completedCheckpointStore, + checkpointStorage, + ioExecutor, + checkpointsCleaner, + timer, + sharedStateRegistryFactory, + failureManager, + checkpointPlanCalculator, + new ExecutionAttemptMappingProvider( + executionGraph.getAllExecutionVertices())); + checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker); + return checkpointCoordinator; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 7c2091f..3174e2b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -685,16 +685,18 @@ public class PendingCheckpointTest { 1024, 4096); - return new PendingCheckpoint( - new JobID(), - 0, - 1, - checkpointPlan, - operatorCoordinators, - masterStateIdentifiers, - props, - location, - new CompletableFuture<>()); + PendingCheckpoint pendingCheckpoint = + new PendingCheckpoint( + new JobID(), + 0, + 1, + checkpointPlan, + operatorCoordinators, + masterStateIdentifiers, + props, + new CompletableFuture<>()); + pendingCheckpoint.setCheckpointTargetLocation(location); + return pendingCheckpoint; } @SuppressWarnings("unchecked")
