This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 970932bfe207617be45568ea6ad5c6357a412fc7
Author: fanrui <[email protected]>
AuthorDate: Mon Feb 28 11:35:09 2022 +0800

    [FLINK-26049][checkpoint] initialize CheckpointLocation after create 
PendingCheckpoint
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 95 +++++++++++-----------
 .../runtime/checkpoint/PendingCheckpoint.java      | 16 ++--
 .../checkpoint/CheckpointCoordinatorTest.java      | 14 +++-
 .../CheckpointCoordinatorTestingUtils.java         | 45 ++++++----
 .../runtime/checkpoint/PendingCheckpointTest.java  | 22 ++---
 5 files changed, 113 insertions(+), 79 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index a2bff9d..384e047 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -541,13 +541,12 @@ public class CheckpointCoordinator {
                             .thenApplyAsync(
                                     plan -> {
                                         try {
-                                            CheckpointIdAndStorageLocation
-                                                    
checkpointIdAndStorageLocation =
-                                                            
initializeCheckpoint(
-                                                                    
request.props,
-                                                                    
request.externalSavepointLocation);
-                                            return new Tuple2<>(
-                                                    plan, 
checkpointIdAndStorageLocation);
+                                            // this must happen outside the 
coordinator-wide lock,
+                                            // because it communicates with 
external services
+                                            // (in HA mode) and may block for 
a while.
+                                            long checkpointID =
+                                                    
checkpointIdCounter.getAndIncrement();
+                                            return new Tuple2<>(plan, 
checkpointID);
                                         } catch (Throwable e) {
                                             throw new CompletionException(e);
                                         }
@@ -560,20 +559,41 @@ public class CheckpointCoordinator {
                                                     request.props,
                                                     checkpointInfo.f0,
                                                     request.isPeriodic,
-                                                    
checkpointInfo.f1.checkpointId,
-                                                    
checkpointInfo.f1.checkpointStorageLocation,
+                                                    checkpointInfo.f1,
                                                     
request.getOnCompletionFuture()),
                                     timer);
 
             final CompletableFuture<?> coordinatorCheckpointsComplete =
-                    pendingCheckpointCompletableFuture.thenComposeAsync(
-                            (pendingCheckpoint) ->
-                                    OperatorCoordinatorCheckpoints
-                                            
.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
-                                                    coordinatorsToCheckpoint,
-                                                    pendingCheckpoint,
-                                                    timer),
-                            timer);
+                    pendingCheckpointCompletableFuture
+                            .thenApplyAsync(
+                                    pendingCheckpoint -> {
+                                        try {
+                                            CheckpointStorageLocation 
checkpointStorageLocation =
+                                                    
initializeCheckpointLocation(
+                                                            
pendingCheckpoint.getCheckpointID(),
+                                                            request.props,
+                                                            
request.externalSavepointLocation);
+                                            return Tuple2.of(
+                                                    pendingCheckpoint, 
checkpointStorageLocation);
+                                        } catch (Throwable e) {
+                                            throw new CompletionException(e);
+                                        }
+                                    },
+                                    executor)
+                            .thenComposeAsync(
+                                    (checkpointInfo) -> {
+                                        PendingCheckpoint pendingCheckpoint = 
checkpointInfo.f0;
+                                        synchronized (lock) {
+                                            
pendingCheckpoint.setCheckpointTargetLocation(
+                                                    checkpointInfo.f1);
+                                        }
+                                        return OperatorCoordinatorCheckpoints
+                                                
.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
+                                                        
coordinatorsToCheckpoint,
+                                                        pendingCheckpoint,
+                                                        timer);
+                                    },
+                                    timer);
 
             // We have to take the snapshot of the master hooks after the 
coordinator checkpoints
             // has completed.
@@ -714,29 +734,24 @@ public class CheckpointCoordinator {
     }
 
     /**
-     * Initialize the checkpoint trigger asynchronously. It will expected to 
be executed in io
+     * Initialize the checkpoint location asynchronously. It will expected to 
be executed in io
      * thread due to it might be time-consuming.
      *
+     * @param checkpointID checkpoint id
      * @param props checkpoint properties
      * @param externalSavepointLocation the external savepoint location, it 
might be null
-     * @return the initialized result, checkpoint id and checkpoint location
+     * @return the checkpoint location
      */
-    private CheckpointIdAndStorageLocation initializeCheckpoint(
-            CheckpointProperties props, @Nullable String 
externalSavepointLocation)
+    private CheckpointStorageLocation initializeCheckpointLocation(
+            long checkpointID,
+            CheckpointProperties props,
+            @Nullable String externalSavepointLocation)
             throws Exception {
 
-        // this must happen outside the coordinator-wide lock, because it
-        // communicates
-        // with external services (in HA mode) and may block for a while.
-        long checkpointID = checkpointIdCounter.getAndIncrement();
-
-        CheckpointStorageLocation checkpointStorageLocation =
-                props.isSavepoint()
-                        ? checkpointStorageView.initializeLocationForSavepoint(
-                                checkpointID, externalSavepointLocation)
-                        : 
checkpointStorageView.initializeLocationForCheckpoint(checkpointID);
-
-        return new CheckpointIdAndStorageLocation(checkpointID, 
checkpointStorageLocation);
+        return props.isSavepoint()
+                ? checkpointStorageView.initializeLocationForSavepoint(
+                        checkpointID, externalSavepointLocation)
+                : 
checkpointStorageView.initializeLocationForCheckpoint(checkpointID);
     }
 
     private PendingCheckpoint createPendingCheckpoint(
@@ -745,7 +760,6 @@ public class CheckpointCoordinator {
             CheckpointPlan checkpointPlan,
             boolean isPeriodic,
             long checkpointID,
-            CheckpointStorageLocation checkpointStorageLocation,
             CompletableFuture<CompletedCheckpoint> onCompletionPromise) {
 
         synchronized (lock) {
@@ -767,7 +781,6 @@ public class CheckpointCoordinator {
                         OperatorInfo.getIds(coordinatorsToCheckpoint),
                         masterHooks.keySet(),
                         props,
-                        checkpointStorageLocation,
                         onCompletionPromise);
 
         trackPendingCheckpointStats(checkpoint);
@@ -2025,18 +2038,6 @@ public class CheckpointCoordinator {
         }
     }
 
-    private static class CheckpointIdAndStorageLocation {
-        private final long checkpointId;
-        private final CheckpointStorageLocation checkpointStorageLocation;
-
-        CheckpointIdAndStorageLocation(
-                long checkpointId, CheckpointStorageLocation 
checkpointStorageLocation) {
-
-            this.checkpointId = checkpointId;
-            this.checkpointStorageLocation = 
checkNotNull(checkpointStorageLocation);
-        }
-    }
-
     static class CheckpointTriggerRequest {
         final long timestamp;
         final CheckpointProperties props;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index f64af16..47e7a32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -105,12 +105,12 @@ public class PendingCheckpoint implements Checkpoint {
     /** The checkpoint properties. */
     private final CheckpointProperties props;
 
-    /** Target storage location to persist the checkpoint metadata to. */
-    private final CheckpointStorageLocation targetLocation;
-
     /** The promise to fulfill once the checkpoint has been completed. */
     private final CompletableFuture<CompletedCheckpoint> onCompletionPromise;
 
+    /** Target storage location to persist the checkpoint metadata to. */
+    @Nullable private CheckpointStorageLocation targetLocation;
+
     private int numAcknowledgedTasks;
 
     private boolean disposed;
@@ -131,7 +131,6 @@ public class PendingCheckpoint implements Checkpoint {
             Collection<OperatorID> operatorCoordinatorsToConfirm,
             Collection<String> masterStateIdentifiers,
             CheckpointProperties props,
-            CheckpointStorageLocation targetLocation,
             CompletableFuture<CompletedCheckpoint> onCompletionPromise) {
 
         checkArgument(
@@ -149,7 +148,6 @@ public class PendingCheckpoint implements Checkpoint {
         }
 
         this.props = checkNotNull(props);
-        this.targetLocation = checkNotNull(targetLocation);
 
         this.operatorStates = new HashMap<>();
         this.masterStates = new ArrayList<>(masterStateIdentifiers.size());
@@ -186,6 +184,10 @@ public class PendingCheckpoint implements Checkpoint {
         return checkpointId;
     }
 
+    public void setCheckpointTargetLocation(CheckpointStorageLocation 
targetLocation) {
+        this.targetLocation = targetLocation;
+    }
+
     public CheckpointStorageLocation getCheckpointStorageLocation() {
         return targetLocation;
     }
@@ -599,7 +601,9 @@ public class PendingCheckpoint implements Checkpoint {
         // unregistered shared states are still considered private at this 
point.
         try {
             
StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
-            targetLocation.disposeOnFailure();
+            if (targetLocation != null) {
+                targetLocation.disposeOnFailure();
+            }
         } catch (Throwable t) {
             LOG.warn(
                     "Could not properly dispose the private states in the 
pending checkpoint {} of job {}.",
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 8c716db..f6f50b9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -671,20 +671,30 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
 
     /** Tests that do not trigger checkpoint when IOException occurred. */
     @Test
-    public void testTriggerCheckpointAfterIOException() throws Exception {
-        // given: Checkpoint coordinator which fails on 
initializeLocationForCheckpoint.
+    public void testTriggerCheckpointAfterCheckpointStorageIOException() 
throws Exception {
+        // given: Checkpoint coordinator which fails on 
initializeCheckpointLocation.
         TestFailJobCallback failureCallback = new TestFailJobCallback();
+        CheckpointStatsTracker statsTracker =
+                new CheckpointStatsTracker(
+                        Integer.MAX_VALUE,
+                        mock(CheckpointCoordinatorConfiguration.class),
+                        new UnregisteredMetricsGroup());
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
                         .setFailureManager(new CheckpointFailureManager(0, 
failureCallback))
                         .setCheckpointStorage(new 
IOExceptionCheckpointStorage())
                         .setTimer(manuallyTriggeredScheduledExecutor)
+                        .setCheckpointStatsTracker(statsTracker)
                         .build();
+
         // when: The checkpoint is triggered.
         testTriggerCheckpoint(checkpointCoordinator, IO_EXCEPTION);
 
         // then: Failure manager should fail the job.
         assertEquals(1, failureCallback.getInvokeCounter());
+
+        // then: Should created PendingCheckpoint
+        assertNotNull(statsTracker.getPendingCheckpointStats(1));
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index a178a4c..4ba15fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkStrategyTest;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
@@ -724,6 +725,12 @@ public class CheckpointCoordinatorTestingUtils {
 
         private boolean allowCheckpointsAfterTasksFinished;
 
+        private CheckpointStatsTracker checkpointStatsTracker =
+                new CheckpointStatsTracker(
+                        1,
+                        CheckpointCoordinatorConfiguration.builder().build(),
+                        new WatermarkStrategyTest.DummyMetricGroup());
+
         public CheckpointCoordinatorBuilder 
setCheckpointCoordinatorConfiguration(
                 CheckpointCoordinatorConfiguration 
checkpointCoordinatorConfiguration) {
             this.checkpointCoordinatorConfiguration = 
checkpointCoordinatorConfiguration;
@@ -798,6 +805,12 @@ public class CheckpointCoordinatorTestingUtils {
             return this;
         }
 
+        public CheckpointCoordinatorBuilder setCheckpointStatsTracker(
+                CheckpointStatsTracker checkpointStatsTracker) {
+            this.checkpointStatsTracker = checkpointStatsTracker;
+            return this;
+        }
+
         public CheckpointCoordinator build() throws Exception {
             if (executionGraph == null) {
                 executionGraph =
@@ -813,20 +826,24 @@ public class CheckpointCoordinatorTestingUtils {
                             executionGraph.getVerticesTopologically(),
                             allowCheckpointsAfterTasksFinished);
 
-            return new CheckpointCoordinator(
-                    executionGraph.getJobID(),
-                    checkpointCoordinatorConfiguration,
-                    coordinatorsToCheckpoint,
-                    checkpointIDCounter,
-                    completedCheckpointStore,
-                    checkpointStorage,
-                    ioExecutor,
-                    checkpointsCleaner,
-                    timer,
-                    sharedStateRegistryFactory,
-                    failureManager,
-                    checkpointPlanCalculator,
-                    new 
ExecutionAttemptMappingProvider(executionGraph.getAllExecutionVertices()));
+            CheckpointCoordinator checkpointCoordinator =
+                    new CheckpointCoordinator(
+                            executionGraph.getJobID(),
+                            checkpointCoordinatorConfiguration,
+                            coordinatorsToCheckpoint,
+                            checkpointIDCounter,
+                            completedCheckpointStore,
+                            checkpointStorage,
+                            ioExecutor,
+                            checkpointsCleaner,
+                            timer,
+                            sharedStateRegistryFactory,
+                            failureManager,
+                            checkpointPlanCalculator,
+                            new ExecutionAttemptMappingProvider(
+                                    executionGraph.getAllExecutionVertices()));
+            
checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
+            return checkpointCoordinator;
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 7c2091f..3174e2b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -685,16 +685,18 @@ public class PendingCheckpointTest {
                         1024,
                         4096);
 
-        return new PendingCheckpoint(
-                new JobID(),
-                0,
-                1,
-                checkpointPlan,
-                operatorCoordinators,
-                masterStateIdentifiers,
-                props,
-                location,
-                new CompletableFuture<>());
+        PendingCheckpoint pendingCheckpoint =
+                new PendingCheckpoint(
+                        new JobID(),
+                        0,
+                        1,
+                        checkpointPlan,
+                        operatorCoordinators,
+                        masterStateIdentifiers,
+                        props,
+                        new CompletableFuture<>());
+        pendingCheckpoint.setCheckpointTargetLocation(location);
+        return pendingCheckpoint;
     }
 
     @SuppressWarnings("unchecked")

Reply via email to