This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dc419b5639f68bcb0b773763f24179dd3536d713
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Wed Feb 23 16:45:26 2022 +0100

    [FLINK-25958][runtime] Mark CompletedCheckpoint as discarded before it will 
be really discarded in order to avoid synchronization for changing discarded 
flag
---
 .../flink/runtime/checkpoint/Checkpoint.java       |  14 ++-
 .../runtime/checkpoint/CheckpointsCleaner.java     |  20 +---
 .../runtime/checkpoint/CompletedCheckpoint.java    | 125 ++++++++++-----------
 .../checkpoint/CompletedCheckpointStats.java       |   4 +-
 .../EmbeddedCompletedCheckpointStore.java          |   4 +-
 .../runtime/checkpoint/PendingCheckpoint.java      |  69 +++++++-----
 .../StandaloneCompletedCheckpointStore.java        |  10 +-
 .../checkpoint/CompletedCheckpointStoreTest.java   |  46 +++-----
 .../checkpoint/CompletedCheckpointTest.java        |   8 +-
 .../StandaloneCompletedCheckpointStoreTest.java    |   2 +-
 10 files changed, 153 insertions(+), 149 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoint.java
index 4e654ba..b897287 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoint.java
@@ -19,7 +19,19 @@ package org.apache.flink.runtime.checkpoint;
 
 /** A checkpoint, pending or completed. */
 public interface Checkpoint {
+    DiscardObject NOOP_DISCARD_OBJECT = () -> {};
+
     long getCheckpointID();
 
-    void discard() throws Exception;
+    /**
+     * This method precede the {@link DiscardObject#discard()} method and 
should be called from the
+     * {@link CheckpointCoordinator}(under the lock) while {@link 
DiscardObject#discard()} can be
+     * called from any thread/place.
+     */
+    DiscardObject markAsDiscarded();
+
+    /** Extra interface for discarding the checkpoint. */
+    interface DiscardObject {
+        void discard() throws Exception;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
index 506cb6d..08161dc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
@@ -59,24 +59,16 @@ public class CheckpointsCleaner implements Serializable, 
AutoCloseableAsync {
             boolean shouldDiscard,
             Runnable postCleanAction,
             Executor executor) {
-        cleanup(
-                checkpoint,
-                () -> {
-                    if (shouldDiscard) {
-                        checkpoint.discard();
-                    }
-                },
-                postCleanAction,
-                executor);
+        Checkpoint.DiscardObject discardObject =
+                shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
+
+        cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
     public void cleanCheckpointOnFailedStoring(
             CompletedCheckpoint completedCheckpoint, Executor executor) {
-        cleanup(
-                completedCheckpoint,
-                completedCheckpoint::discardOnFailedStoring,
-                () -> {},
-                executor);
+        Checkpoint.DiscardObject discardObject = 
completedCheckpoint.markAsDiscarded();
+        cleanup(completedCheckpoint, discardObject::discard, () -> {}, 
executor);
     }
 
     private void cleanup(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 841e543..41c5774 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -43,6 +44,7 @@ import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A CompletedCheckpoint describes a checkpoint after all required tasks 
acknowledged it (with their
@@ -66,6 +68,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * metadata file. For a state backend that stores metadata in database tables, 
the pointer could be
  * the table name and row key. The pointer is encoded as a String.
  */
+@NotThreadSafe
 public class CompletedCheckpoint implements Serializable, Checkpoint {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CompletedCheckpoint.class);
@@ -216,77 +219,20 @@ public class CompletedCheckpoint implements Serializable, 
Checkpoint {
     //  Discard and Dispose
     // ------------------------------------------------------------------------
 
-    public void discardOnFailedStoring() throws Exception {
-        discard();
-    }
-
-    public boolean discardOnSubsume() throws Exception {
-        if (shouldBeDiscardedOnSubsume()) {
-            discard();
-            return true;
+    public DiscardObject markAsDiscarded() {
+        if (completedCheckpointStats != null) {
+            completedCheckpointStats.discard();
         }
 
-        return false;
-    }
-
-    public boolean discardOnShutdown(JobStatus jobStatus) throws Exception {
-
-        if (shouldBeDiscardedOnShutdown(jobStatus)) {
-
-            discard();
-            return true;
-        } else {
-            LOG.info("Checkpoint with ID {} at '{}' not discarded.", 
checkpointID, externalPointer);
-            return false;
-        }
+        return new CompletedCheckpointDiscardObject();
     }
 
-    @Override
-    public void discard() throws Exception {
-        LOG.trace("Executing discard procedure for {}.", this);
-
-        try {
-            // collect exceptions and continue cleanup
-            Exception exception = null;
-
-            // drop the metadata
-            try {
-                metadataHandle.discardState();
-            } catch (Exception e) {
-                exception = e;
-            }
-
-            // discard private state objects
-            try {
-                
StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
-            } catch (Exception e) {
-                exception = ExceptionUtils.firstOrSuppressed(e, exception);
-            }
-
-            // discard location as a whole
-            try {
-                storageLocation.disposeStorageLocation();
-            } catch (Exception e) {
-                exception = ExceptionUtils.firstOrSuppressed(e, exception);
-            }
-
-            if (exception != null) {
-                throw exception;
-            }
-        } finally {
-            operatorStates.clear();
-
-            if (completedCheckpointStats != null) {
-                completedCheckpointStats.discard();
-            }
-        }
+    public DiscardObject markAsDiscardedOnSubsume() {
+        return shouldBeDiscardedOnSubsume() ? markAsDiscarded() : 
NOOP_DISCARD_OBJECT;
     }
 
-    /** NOT Thread safe. This method can be called only from 
CheckpointCoordinator thread. */
-    public void markAsDiscarded() {
-        if (completedCheckpointStats != null) {
-            completedCheckpointStats.discard();
-        }
+    public DiscardObject markAsDiscardedOnShutdown(JobStatus jobStatus) {
+        return shouldBeDiscardedOnShutdown(jobStatus) ? markAsDiscarded() : 
NOOP_DISCARD_OBJECT;
     }
 
     public boolean shouldBeDiscardedOnSubsume() {
@@ -338,4 +284,53 @@ public class CompletedCheckpoint implements Serializable, 
Checkpoint {
                 "%s %d @ %d for %s located at %s",
                 props.getCheckpointType().getName(), checkpointID, timestamp, 
job, externalPointer);
     }
+
+    /** Implementation of {@link 
org.apache.flink.runtime.checkpoint.Checkpoint.DiscardObject}. */
+    @NotThreadSafe
+    public class CompletedCheckpointDiscardObject implements DiscardObject {
+
+        @Override
+        public void discard() throws Exception {
+            LOG.trace("Executing discard procedure for {}.", this);
+            checkState(
+                    isMarkedAsDiscarded(),
+                    "Checkpoint should be marked as discarded before 
discard.");
+
+            try {
+                // collect exceptions and continue cleanup
+                Exception exception = null;
+
+                // drop the metadata
+                try {
+                    metadataHandle.discardState();
+                } catch (Exception e) {
+                    exception = e;
+                }
+
+                // discard private state objects
+                try {
+                    
StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
+                } catch (Exception e) {
+                    exception = ExceptionUtils.firstOrSuppressed(e, exception);
+                }
+
+                // discard location as a whole
+                try {
+                    storageLocation.disposeStorageLocation();
+                } catch (Exception e) {
+                    exception = ExceptionUtils.firstOrSuppressed(e, exception);
+                }
+
+                if (exception != null) {
+                    throw exception;
+                }
+            } finally {
+                operatorStates.clear();
+            }
+        }
+
+        private boolean isMarkedAsDiscarded() {
+            return completedCheckpointStats == null || 
completedCheckpointStats.isDiscarded();
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
index 88339ac..06cc8a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
@@ -182,9 +182,7 @@ public class CompletedCheckpointStats extends 
AbstractCheckpointStats {
         return discarded;
     }
 
-    /**
-     * Mark the checkpoint has been discarded.
-     */
+    /** Mark the checkpoint has been discarded. */
     void discard() {
         discarded = true;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
index 653ecf1..1e5e47c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
@@ -90,7 +90,7 @@ public class EmbeddedCompletedCheckpointStore extends 
AbstractCompleteCheckpoint
                 CheckpointSubsumeHelper.subsume(
                                 checkpoints,
                                 maxRetainedCheckpoints,
-                                CompletedCheckpoint::discardOnSubsume)
+                                cc -> cc.markAsDiscardedOnSubsume().discard())
                         .orElse(null);
 
         unregisterUnusedState(checkpoints);
@@ -101,7 +101,7 @@ public class EmbeddedCompletedCheckpointStore extends 
AbstractCompleteCheckpoint
     @VisibleForTesting
     void removeOldestCheckpoint() throws Exception {
         CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
-        checkpointToSubsume.discardOnSubsume();
+        checkpointToSubsume.markAsDiscardedOnSubsume().discard();
         unregisterUnusedState(checkpoints);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index aee95dd..76c6a44 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -64,6 +65,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * <p>Note that the pending checkpoint, as well as the successful checkpoint 
keep the state handles
  * always as serialized values, never as actual values.
  */
+@NotThreadSafe
 public class PendingCheckpoint implements Checkpoint {
 
     /** Result of the {@link PendingCheckpoint#acknowledgedTasks} method. */
@@ -591,35 +593,9 @@ public class PendingCheckpoint implements Checkpoint {
         }
     }
 
-    /**
-     * Discard state. Must be called after {@link #dispose(boolean, 
CheckpointsCleaner, Runnable,
-     * Executor) dispose}.
-     */
     @Override
-    public void discard() {
-        synchronized (lock) {
-            if (discarded) {
-                Preconditions.checkState(
-                        disposed, "Checkpoint should be disposed before being 
discarded");
-                return;
-            } else {
-                discarded = true;
-            }
-        }
-        // discard the private states.
-        // unregistered shared states are still considered private at this 
point.
-        try {
-            
StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
-            targetLocation.disposeOnFailure();
-        } catch (Throwable t) {
-            LOG.warn(
-                    "Could not properly dispose the private states in the 
pending checkpoint {} of job {}.",
-                    checkpointId,
-                    jobId,
-                    t);
-        } finally {
-            operatorStates.clear();
-        }
+    public DiscardObject markAsDiscarded() {
+        return new PendingCheckpointDiscardObject();
     }
 
     private void cancelCanceller() {
@@ -660,4 +636,41 @@ public class PendingCheckpoint implements Checkpoint {
                 getNumberOfAcknowledgedTasks(),
                 getNumberOfNonAcknowledgedTasks());
     }
+
+    /**
+     * Implementation of {@link 
org.apache.flink.runtime.checkpoint.Checkpoint.DiscardObject} for
+     * {@link PendingCheckpoint}.
+     */
+    public class PendingCheckpointDiscardObject implements DiscardObject {
+        /**
+         * Discard state. Must be called after {@link #dispose(boolean, 
CheckpointsCleaner,
+         * Runnable, Executor) dispose}.
+         */
+        @Override
+        public void discard() {
+            synchronized (lock) {
+                if (discarded) {
+                    Preconditions.checkState(
+                            disposed, "Checkpoint should be disposed before 
being discarded");
+                    return;
+                } else {
+                    discarded = true;
+                }
+            }
+            // discard the private states.
+            // unregistered shared states are still considered private at this 
point.
+            try {
+                
StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
+                targetLocation.disposeOnFailure();
+            } catch (Throwable t) {
+                LOG.warn(
+                        "Could not properly dispose the private states in the 
pending checkpoint {} of job {}.",
+                        checkpointId,
+                        jobId,
+                        t);
+            } finally {
+                operatorStates.clear();
+            }
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 9a1d1095..87a6486 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -101,7 +101,7 @@ public class StandaloneCompletedCheckpointStore extends 
AbstractCompleteCheckpoi
                 CheckpointSubsumeHelper.subsume(
                                 checkpoints,
                                 maxNumberOfCheckpointsToRetain,
-                                CompletedCheckpoint::discardOnSubsume)
+                                cc -> cc.markAsDiscardedOnSubsume().discard())
                         .orElse(null);
 
         unregisterUnusedState(checkpoints);
@@ -133,7 +133,13 @@ public class StandaloneCompletedCheckpointStore extends 
AbstractCompleteCheckpoi
 
             long lowestRetained = Long.MAX_VALUE;
             for (CompletedCheckpoint checkpoint : checkpoints) {
-                if (!checkpoint.discardOnShutdown(jobStatus)) {
+                if (checkpoint.shouldBeDiscardedOnShutdown(jobStatus)) {
+                    checkpoint.markAsDiscardedOnShutdown(jobStatus).discard();
+                } else {
+                    LOG.info(
+                            "Checkpoint with ID {} at '{}' not discarded.",
+                            checkpoint.getCheckpointID(),
+                            checkpoint.getExternalPointer());
                     lowestRetained = Math.min(checkpoint.getCheckpointID(), 
lowestRetained);
                 }
             }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 62e5479..39891d6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -312,35 +312,8 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
         }
 
         @Override
-        public boolean discardOnSubsume() throws Exception {
-            if (super.discardOnSubsume()) {
-                discard();
-                return true;
-            } else {
-                return false;
-            }
-        }
-
-        @Override
-        public boolean discardOnShutdown(JobStatus jobStatus) throws Exception 
{
-            if (super.discardOnShutdown(jobStatus)) {
-                discard();
-                return true;
-            } else {
-                return false;
-            }
-        }
-
-        @Override
-        public void discard() throws Exception {
-            super.discard();
-            if (!isDiscarded) {
-                this.isDiscarded = true;
-
-                if (discardLatch != null) {
-                    discardLatch.countDown();
-                }
-            }
+        public CompletedCheckpointDiscardObject markAsDiscarded() {
+            return new TestCompletedCheckpointDiscardObject();
         }
 
         public boolean isDiscarded() {
@@ -376,6 +349,21 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
         public int hashCode() {
             return getJobId().hashCode() + (int) getCheckpointID();
         }
+
+        /** */
+        public class TestCompletedCheckpointDiscardObject extends 
CompletedCheckpointDiscardObject {
+            @Override
+            public void discard() throws Exception {
+                super.discard();
+                if (!isDiscarded) {
+                    isDiscarded = true;
+
+                    if (discardLatch != null) {
+                        discardLatch.countDown();
+                    }
+                }
+            }
+        }
     }
 
     public static class TestOperatorSubtaskState extends OperatorSubtaskState {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 3a29452..b6461a9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -271,7 +271,7 @@ public class CompletedCheckpointTest {
         verify(state, times(1)).registerSharedStates(sharedStateRegistry, 0L);
 
         // Subsume
-        checkpoint.discardOnSubsume();
+        checkpoint.markAsDiscardedOnSubsume().discard();
 
         verify(state, times(1)).discardState();
         assertTrue(location.isDisposed());
@@ -319,7 +319,7 @@ public class CompletedCheckpointTest {
                             retainedLocation,
                             null);
 
-            checkpoint.discardOnShutdown(status);
+            checkpoint.markAsDiscardedOnShutdown(status).discard();
 
             verify(state, times(0)).discardState();
             assertFalse(retainedLocation.isDisposed());
@@ -347,7 +347,7 @@ public class CompletedCheckpointTest {
                             discardLocation,
                             null);
 
-            checkpoint.discardOnShutdown(status);
+            checkpoint.markAsDiscardedOnShutdown(status).discard();
 
             verify(state, times(1)).discardState();
             assertTrue(discardLocation.isDisposed());
@@ -388,7 +388,7 @@ public class CompletedCheckpointTest {
                         new TestCompletedCheckpointStorageLocation(),
                         checkpointStats);
 
-        completed.discardOnShutdown(JobStatus.FINISHED);
+        completed.markAsDiscardedOnShutdown(JobStatus.FINISHED).discard();
         assertTrue(checkpointStats.isDiscarded());
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index 9a584c9..f30df4d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -110,7 +110,7 @@ public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointS
                             new TestCompletedCheckpointStorageLocation(),
                             null) {
                         @Override
-                        public boolean discardOnSubsume() {
+                        public CompletedCheckpointDiscardObject 
markAsDiscardedOnSubsume() {
                             discardAttempted.countDown();
                             throw new RuntimeException();
                         }

Reply via email to