This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5ce2e062cdb1c7dfd7e641cd7f10de04440a0583
Author: fanrui <[email protected]>
AuthorDate: Mon Feb 28 11:35:09 2022 +0800

    [FLINK-26049][checkpoint] initialize CheckpointLocation after create 
PendingCheckpoint
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 86 +++++++++++-----------
 .../runtime/checkpoint/PendingCheckpoint.java      | 16 ++--
 .../checkpoint/CheckpointCoordinatorTest.java      | 10 ++-
 .../runtime/checkpoint/PendingCheckpointTest.java  | 24 +++---
 4 files changed, 75 insertions(+), 61 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 475effc..4efff80 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -545,14 +545,12 @@ public class CheckpointCoordinator {
                             .thenApplyAsync(
                                     plan -> {
                                         try {
-                                            CheckpointIdAndStorageLocation
-                                                    
checkpointIdAndStorageLocation =
-                                                            
initializeCheckpoint(
-                                                                    
request.props,
-                                                                    
request.externalSavepointLocation,
-                                                                    
initializeBaseLocations);
-                                            return new Tuple2<>(
-                                                    plan, 
checkpointIdAndStorageLocation);
+                                            // this must happen outside the 
coordinator-wide lock,
+                                            // because it communicates with 
external services
+                                            // (in HA mode) and may block for 
a while.
+                                            long checkpointID =
+                                                    
checkpointIdCounter.getAndIncrement();
+                                            return new Tuple2<>(plan, 
checkpointID);
                                         } catch (Throwable e) {
                                             throw new CompletionException(e);
                                         }
@@ -565,20 +563,42 @@ public class CheckpointCoordinator {
                                                     request.props,
                                                     checkpointInfo.f0,
                                                     request.isPeriodic,
-                                                    
checkpointInfo.f1.checkpointId,
-                                                    
checkpointInfo.f1.checkpointStorageLocation,
+                                                    checkpointInfo.f1,
                                                     
request.getOnCompletionFuture()),
                                     timer);
 
             final CompletableFuture<?> coordinatorCheckpointsComplete =
-                    pendingCheckpointCompletableFuture.thenComposeAsync(
-                            (pendingCheckpoint) ->
-                                    OperatorCoordinatorCheckpoints
-                                            
.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
-                                                    coordinatorsToCheckpoint,
-                                                    pendingCheckpoint,
-                                                    timer),
-                            timer);
+                    pendingCheckpointCompletableFuture
+                            .thenApplyAsync(
+                                    pendingCheckpoint -> {
+                                        try {
+                                            CheckpointStorageLocation 
checkpointStorageLocation =
+                                                    
initializeCheckpointLocation(
+                                                            
pendingCheckpoint.getCheckpointID(),
+                                                            request.props,
+                                                            
request.externalSavepointLocation,
+                                                            
initializeBaseLocations);
+                                            return Tuple2.of(
+                                                    pendingCheckpoint, 
checkpointStorageLocation);
+                                        } catch (Throwable e) {
+                                            throw new CompletionException(e);
+                                        }
+                                    },
+                                    executor)
+                            .thenComposeAsync(
+                                    (checkpointInfo) -> {
+                                        PendingCheckpoint pendingCheckpoint = 
checkpointInfo.f0;
+                                        synchronized (lock) {
+                                            
pendingCheckpoint.setCheckpointTargetLocation(
+                                                    checkpointInfo.f1);
+                                        }
+                                        return OperatorCoordinatorCheckpoints
+                                                
.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
+                                                        
coordinatorsToCheckpoint,
+                                                        pendingCheckpoint,
+                                                        timer);
+                                    },
+                                    timer);
 
             // We have to take the snapshot of the master hooks after the 
coordinator checkpoints
             // has completed.
@@ -726,24 +746,20 @@ public class CheckpointCoordinator {
     }
 
     /**
-     * Initialize the checkpoint trigger asynchronously. It will expected to 
be executed in io
+     * Initialize the checkpoint location asynchronously. It will expected to 
be executed in io
      * thread due to it might be time-consuming.
      *
+     * @param checkpointID checkpoint id
      * @param props checkpoint properties
      * @param externalSavepointLocation the external savepoint location, it 
might be null
-     * @return the initialized result, checkpoint id and checkpoint location
+     * @return the checkpoint location
      */
-    private CheckpointIdAndStorageLocation initializeCheckpoint(
+    private CheckpointStorageLocation initializeCheckpointLocation(
+            long checkpointID,
             CheckpointProperties props,
             @Nullable String externalSavepointLocation,
             boolean initializeBaseLocations)
             throws Exception {
-
-        // this must happen outside the coordinator-wide lock, because it
-        // communicates
-        // with external services (in HA mode) and may block for a while.
-        long checkpointID = checkpointIdCounter.getAndIncrement();
-
         final CheckpointStorageLocation checkpointStorageLocation;
         if (props.isSavepoint()) {
             checkpointStorageLocation =
@@ -757,7 +773,7 @@ public class CheckpointCoordinator {
                     
checkpointStorageView.initializeLocationForCheckpoint(checkpointID);
         }
 
-        return new CheckpointIdAndStorageLocation(checkpointID, 
checkpointStorageLocation);
+        return checkpointStorageLocation;
     }
 
     private PendingCheckpoint createPendingCheckpoint(
@@ -766,7 +782,6 @@ public class CheckpointCoordinator {
             CheckpointPlan checkpointPlan,
             boolean isPeriodic,
             long checkpointID,
-            CheckpointStorageLocation checkpointStorageLocation,
             CompletableFuture<CompletedCheckpoint> onCompletionPromise) {
 
         synchronized (lock) {
@@ -791,7 +806,6 @@ public class CheckpointCoordinator {
                         OperatorInfo.getIds(coordinatorsToCheckpoint),
                         masterHooks.keySet(),
                         props,
-                        checkpointStorageLocation,
                         onCompletionPromise,
                         pendingCheckpointStats);
 
@@ -2161,18 +2175,6 @@ public class CheckpointCoordinator {
         }
     }
 
-    private static class CheckpointIdAndStorageLocation {
-        private final long checkpointId;
-        private final CheckpointStorageLocation checkpointStorageLocation;
-
-        CheckpointIdAndStorageLocation(
-                long checkpointId, CheckpointStorageLocation 
checkpointStorageLocation) {
-
-            this.checkpointId = checkpointId;
-            this.checkpointStorageLocation = 
checkNotNull(checkpointStorageLocation);
-        }
-    }
-
     static class CheckpointTriggerRequest {
         final long timestamp;
         final CheckpointProperties props;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 08cd23f..9229024 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -107,14 +107,14 @@ public class PendingCheckpoint implements Checkpoint {
     /** The checkpoint properties. */
     private final CheckpointProperties props;
 
-    /** Target storage location to persist the checkpoint metadata to. */
-    private final CheckpointStorageLocation targetLocation;
-
     /** The promise to fulfill once the checkpoint has been completed. */
     private final CompletableFuture<CompletedCheckpoint> onCompletionPromise;
 
     @Nullable private final PendingCheckpointStats pendingCheckpointStats;
 
+    /** Target storage location to persist the checkpoint metadata to. */
+    @Nullable private CheckpointStorageLocation targetLocation;
+
     private int numAcknowledgedTasks;
 
     private boolean disposed;
@@ -135,7 +135,6 @@ public class PendingCheckpoint implements Checkpoint {
             Collection<OperatorID> operatorCoordinatorsToConfirm,
             Collection<String> masterStateIdentifiers,
             CheckpointProperties props,
-            CheckpointStorageLocation targetLocation,
             CompletableFuture<CompletedCheckpoint> onCompletionPromise,
             @Nullable PendingCheckpointStats pendingCheckpointStats) {
         checkArgument(
@@ -153,7 +152,6 @@ public class PendingCheckpoint implements Checkpoint {
         }
 
         this.props = checkNotNull(props);
-        this.targetLocation = checkNotNull(targetLocation);
 
         this.operatorStates = new HashMap<>();
         this.masterStates = new ArrayList<>(masterStateIdentifiers.size());
@@ -191,6 +189,10 @@ public class PendingCheckpoint implements Checkpoint {
         return checkpointId;
     }
 
+    public void setCheckpointTargetLocation(CheckpointStorageLocation 
targetLocation) {
+        this.targetLocation = targetLocation;
+    }
+
     public CheckpointStorageLocation getCheckpointStorageLocation() {
         return targetLocation;
     }
@@ -645,7 +647,9 @@ public class PendingCheckpoint implements Checkpoint {
             // unregistered shared states are still considered private at this 
point.
             try {
                 
StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
-                targetLocation.disposeOnFailure();
+                if (targetLocation != null) {
+                    targetLocation.disposeOnFailure();
+                }
             } catch (Throwable t) {
                 LOG.warn(
                         "Could not properly dispose the private states in the 
pending checkpoint {} of job {}.",
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 3c09945..b493ef4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -727,11 +727,14 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
 
     /** Tests that do not trigger checkpoint when IOException occurred. */
     @Test
-    public void testTriggerCheckpointAfterIOException() throws Exception {
-        // given: Checkpoint coordinator which fails on 
initializeLocationForCheckpoint.
+    public void testTriggerCheckpointAfterCheckpointStorageIOException() 
throws Exception {
+        // given: Checkpoint coordinator which fails on 
initializeCheckpointLocation.
         TestFailJobCallback failureCallback = new TestFailJobCallback();
+        CheckpointStatsTracker statsTracker =
+                new CheckpointStatsTracker(Integer.MAX_VALUE, new 
UnregisteredMetricsGroup());
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
+                        .setCheckpointStatsTracker(statsTracker)
                         .setFailureManager(new CheckpointFailureManager(0, 
failureCallback))
                         .setCheckpointStorage(new 
IOExceptionCheckpointStorage())
                         .setTimer(manuallyTriggeredScheduledExecutor)
@@ -741,6 +744,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         // then: Failure manager should fail the job.
         assertEquals(1, failureCallback.getInvokeCounter());
+
+        // then: Should created PendingCheckpoint
+        assertNotNull(statsTracker.getPendingCheckpointStats(1));
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 799d743..a0b07b0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -634,17 +634,19 @@ public class PendingCheckpointTest {
                         1024,
                         4096);
 
-        return new PendingCheckpoint(
-                new JobID(),
-                0,
-                1,
-                checkpointPlan,
-                operatorCoordinators,
-                masterStateIdentifiers,
-                props,
-                location,
-                new CompletableFuture<>(),
-                null);
+        PendingCheckpoint pendingCheckpoint =
+                new PendingCheckpoint(
+                        new JobID(),
+                        0,
+                        1,
+                        checkpointPlan,
+                        operatorCoordinators,
+                        masterStateIdentifiers,
+                        props,
+                        new CompletableFuture<>(),
+                        null);
+        pendingCheckpoint.setCheckpointTargetLocation(location);
+        return pendingCheckpoint;
     }
 
     @SuppressWarnings("unchecked")

Reply via email to