This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit dc419b5639f68bcb0b773763f24179dd3536d713 Author: Anton Kalashnikov <[email protected]> AuthorDate: Wed Feb 23 16:45:26 2022 +0100 [FLINK-25958][runtime] Mark CompletedCheckpoint as discarded before it will be really discarded in order to avoid synchronization for changing discarded flag --- .../flink/runtime/checkpoint/Checkpoint.java | 14 ++- .../runtime/checkpoint/CheckpointsCleaner.java | 20 +--- .../runtime/checkpoint/CompletedCheckpoint.java | 125 ++++++++++----------- .../checkpoint/CompletedCheckpointStats.java | 4 +- .../EmbeddedCompletedCheckpointStore.java | 4 +- .../runtime/checkpoint/PendingCheckpoint.java | 69 +++++++----- .../StandaloneCompletedCheckpointStore.java | 10 +- .../checkpoint/CompletedCheckpointStoreTest.java | 46 +++----- .../checkpoint/CompletedCheckpointTest.java | 8 +- .../StandaloneCompletedCheckpointStoreTest.java | 2 +- 10 files changed, 153 insertions(+), 149 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoint.java index 4e654ba..b897287 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoint.java @@ -19,7 +19,19 @@ package org.apache.flink.runtime.checkpoint; /** A checkpoint, pending or completed. */ public interface Checkpoint { + DiscardObject NOOP_DISCARD_OBJECT = () -> {}; + long getCheckpointID(); - void discard() throws Exception; + /** + * This method precede the {@link DiscardObject#discard()} method and should be called from the + * {@link CheckpointCoordinator}(under the lock) while {@link DiscardObject#discard()} can be + * called from any thread/place. + */ + DiscardObject markAsDiscarded(); + + /** Extra interface for discarding the checkpoint. */ + interface DiscardObject { + void discard() throws Exception; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java index 506cb6d..08161dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java @@ -59,24 +59,16 @@ public class CheckpointsCleaner implements Serializable, AutoCloseableAsync { boolean shouldDiscard, Runnable postCleanAction, Executor executor) { - cleanup( - checkpoint, - () -> { - if (shouldDiscard) { - checkpoint.discard(); - } - }, - postCleanAction, - executor); + Checkpoint.DiscardObject discardObject = + shouldDiscard ? checkpoint.markAsDiscarded() : Checkpoint.NOOP_DISCARD_OBJECT; + + cleanup(checkpoint, discardObject::discard, postCleanAction, executor); } public void cleanCheckpointOnFailedStoring( CompletedCheckpoint completedCheckpoint, Executor executor) { - cleanup( - completedCheckpoint, - completedCheckpoint::discardOnFailedStoring, - () -> {}, - executor); + Checkpoint.DiscardObject discardObject = completedCheckpoint.markAsDiscarded(); + cleanup(completedCheckpoint, discardObject::discard, () -> {}, executor); } private void cleanup( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index 841e543..41c5774 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -32,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; import java.io.Serializable; import java.util.ArrayList; @@ -43,6 +44,7 @@ import java.util.Map; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * A CompletedCheckpoint describes a checkpoint after all required tasks acknowledged it (with their @@ -66,6 +68,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * metadata file. For a state backend that stores metadata in database tables, the pointer could be * the table name and row key. The pointer is encoded as a String. */ +@NotThreadSafe public class CompletedCheckpoint implements Serializable, Checkpoint { private static final Logger LOG = LoggerFactory.getLogger(CompletedCheckpoint.class); @@ -216,77 +219,20 @@ public class CompletedCheckpoint implements Serializable, Checkpoint { // Discard and Dispose // ------------------------------------------------------------------------ - public void discardOnFailedStoring() throws Exception { - discard(); - } - - public boolean discardOnSubsume() throws Exception { - if (shouldBeDiscardedOnSubsume()) { - discard(); - return true; + public DiscardObject markAsDiscarded() { + if (completedCheckpointStats != null) { + completedCheckpointStats.discard(); } - return false; - } - - public boolean discardOnShutdown(JobStatus jobStatus) throws Exception { - - if (shouldBeDiscardedOnShutdown(jobStatus)) { - - discard(); - return true; - } else { - LOG.info("Checkpoint with ID {} at '{}' not discarded.", checkpointID, externalPointer); - return false; - } + return new CompletedCheckpointDiscardObject(); } - @Override - public void discard() throws Exception { - LOG.trace("Executing discard procedure for {}.", this); - - try { - // collect exceptions and continue cleanup - Exception exception = null; - - // drop the metadata - try { - metadataHandle.discardState(); - } catch (Exception e) { - exception = e; - } - - // discard private state objects - try { - StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values()); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - // discard location as a whole - try { - storageLocation.disposeStorageLocation(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - if (exception != null) { - throw exception; - } - } finally { - operatorStates.clear(); - - if (completedCheckpointStats != null) { - completedCheckpointStats.discard(); - } - } + public DiscardObject markAsDiscardedOnSubsume() { + return shouldBeDiscardedOnSubsume() ? markAsDiscarded() : NOOP_DISCARD_OBJECT; } - /** NOT Thread safe. This method can be called only from CheckpointCoordinator thread. */ - public void markAsDiscarded() { - if (completedCheckpointStats != null) { - completedCheckpointStats.discard(); - } + public DiscardObject markAsDiscardedOnShutdown(JobStatus jobStatus) { + return shouldBeDiscardedOnShutdown(jobStatus) ? markAsDiscarded() : NOOP_DISCARD_OBJECT; } public boolean shouldBeDiscardedOnSubsume() { @@ -338,4 +284,53 @@ public class CompletedCheckpoint implements Serializable, Checkpoint { "%s %d @ %d for %s located at %s", props.getCheckpointType().getName(), checkpointID, timestamp, job, externalPointer); } + + /** Implementation of {@link org.apache.flink.runtime.checkpoint.Checkpoint.DiscardObject}. */ + @NotThreadSafe + public class CompletedCheckpointDiscardObject implements DiscardObject { + + @Override + public void discard() throws Exception { + LOG.trace("Executing discard procedure for {}.", this); + checkState( + isMarkedAsDiscarded(), + "Checkpoint should be marked as discarded before discard."); + + try { + // collect exceptions and continue cleanup + Exception exception = null; + + // drop the metadata + try { + metadataHandle.discardState(); + } catch (Exception e) { + exception = e; + } + + // discard private state objects + try { + StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values()); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + // discard location as a whole + try { + storageLocation.disposeStorageLocation(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + throw exception; + } + } finally { + operatorStates.clear(); + } + } + + private boolean isMarkedAsDiscarded() { + return completedCheckpointStats == null || completedCheckpointStats.isDiscarded(); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java index 88339ac..06cc8a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java @@ -182,9 +182,7 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats { return discarded; } - /** - * Mark the checkpoint has been discarded. - */ + /** Mark the checkpoint has been discarded. */ void discard() { discarded = true; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java index 653ecf1..1e5e47c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java @@ -90,7 +90,7 @@ public class EmbeddedCompletedCheckpointStore extends AbstractCompleteCheckpoint CheckpointSubsumeHelper.subsume( checkpoints, maxRetainedCheckpoints, - CompletedCheckpoint::discardOnSubsume) + cc -> cc.markAsDiscardedOnSubsume().discard()) .orElse(null); unregisterUnusedState(checkpoints); @@ -101,7 +101,7 @@ public class EmbeddedCompletedCheckpointStore extends AbstractCompleteCheckpoint @VisibleForTesting void removeOldestCheckpoint() throws Exception { CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst(); - checkpointToSubsume.discardOnSubsume(); + checkpointToSubsume.markAsDiscardedOnSubsume().discard(); unregisterUnusedState(checkpoints); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index aee95dd..76c6a44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -38,6 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; import java.util.ArrayList; @@ -64,6 +65,7 @@ import static org.apache.flink.util.Preconditions.checkState; * <p>Note that the pending checkpoint, as well as the successful checkpoint keep the state handles * always as serialized values, never as actual values. */ +@NotThreadSafe public class PendingCheckpoint implements Checkpoint { /** Result of the {@link PendingCheckpoint#acknowledgedTasks} method. */ @@ -591,35 +593,9 @@ public class PendingCheckpoint implements Checkpoint { } } - /** - * Discard state. Must be called after {@link #dispose(boolean, CheckpointsCleaner, Runnable, - * Executor) dispose}. - */ @Override - public void discard() { - synchronized (lock) { - if (discarded) { - Preconditions.checkState( - disposed, "Checkpoint should be disposed before being discarded"); - return; - } else { - discarded = true; - } - } - // discard the private states. - // unregistered shared states are still considered private at this point. - try { - StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values()); - targetLocation.disposeOnFailure(); - } catch (Throwable t) { - LOG.warn( - "Could not properly dispose the private states in the pending checkpoint {} of job {}.", - checkpointId, - jobId, - t); - } finally { - operatorStates.clear(); - } + public DiscardObject markAsDiscarded() { + return new PendingCheckpointDiscardObject(); } private void cancelCanceller() { @@ -660,4 +636,41 @@ public class PendingCheckpoint implements Checkpoint { getNumberOfAcknowledgedTasks(), getNumberOfNonAcknowledgedTasks()); } + + /** + * Implementation of {@link org.apache.flink.runtime.checkpoint.Checkpoint.DiscardObject} for + * {@link PendingCheckpoint}. + */ + public class PendingCheckpointDiscardObject implements DiscardObject { + /** + * Discard state. Must be called after {@link #dispose(boolean, CheckpointsCleaner, + * Runnable, Executor) dispose}. + */ + @Override + public void discard() { + synchronized (lock) { + if (discarded) { + Preconditions.checkState( + disposed, "Checkpoint should be disposed before being discarded"); + return; + } else { + discarded = true; + } + } + // discard the private states. + // unregistered shared states are still considered private at this point. + try { + StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values()); + targetLocation.disposeOnFailure(); + } catch (Throwable t) { + LOG.warn( + "Could not properly dispose the private states in the pending checkpoint {} of job {}.", + checkpointId, + jobId, + t); + } finally { + operatorStates.clear(); + } + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index 9a1d1095..87a6486 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -101,7 +101,7 @@ public class StandaloneCompletedCheckpointStore extends AbstractCompleteCheckpoi CheckpointSubsumeHelper.subsume( checkpoints, maxNumberOfCheckpointsToRetain, - CompletedCheckpoint::discardOnSubsume) + cc -> cc.markAsDiscardedOnSubsume().discard()) .orElse(null); unregisterUnusedState(checkpoints); @@ -133,7 +133,13 @@ public class StandaloneCompletedCheckpointStore extends AbstractCompleteCheckpoi long lowestRetained = Long.MAX_VALUE; for (CompletedCheckpoint checkpoint : checkpoints) { - if (!checkpoint.discardOnShutdown(jobStatus)) { + if (checkpoint.shouldBeDiscardedOnShutdown(jobStatus)) { + checkpoint.markAsDiscardedOnShutdown(jobStatus).discard(); + } else { + LOG.info( + "Checkpoint with ID {} at '{}' not discarded.", + checkpoint.getCheckpointID(), + checkpoint.getExternalPointer()); lowestRetained = Math.min(checkpoint.getCheckpointID(), lowestRetained); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java index 62e5479..39891d6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java @@ -312,35 +312,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { } @Override - public boolean discardOnSubsume() throws Exception { - if (super.discardOnSubsume()) { - discard(); - return true; - } else { - return false; - } - } - - @Override - public boolean discardOnShutdown(JobStatus jobStatus) throws Exception { - if (super.discardOnShutdown(jobStatus)) { - discard(); - return true; - } else { - return false; - } - } - - @Override - public void discard() throws Exception { - super.discard(); - if (!isDiscarded) { - this.isDiscarded = true; - - if (discardLatch != null) { - discardLatch.countDown(); - } - } + public CompletedCheckpointDiscardObject markAsDiscarded() { + return new TestCompletedCheckpointDiscardObject(); } public boolean isDiscarded() { @@ -376,6 +349,21 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { public int hashCode() { return getJobId().hashCode() + (int) getCheckpointID(); } + + /** */ + public class TestCompletedCheckpointDiscardObject extends CompletedCheckpointDiscardObject { + @Override + public void discard() throws Exception { + super.discard(); + if (!isDiscarded) { + isDiscarded = true; + + if (discardLatch != null) { + discardLatch.countDown(); + } + } + } + } } public static class TestOperatorSubtaskState extends OperatorSubtaskState { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java index 3a29452..b6461a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -271,7 +271,7 @@ public class CompletedCheckpointTest { verify(state, times(1)).registerSharedStates(sharedStateRegistry, 0L); // Subsume - checkpoint.discardOnSubsume(); + checkpoint.markAsDiscardedOnSubsume().discard(); verify(state, times(1)).discardState(); assertTrue(location.isDisposed()); @@ -319,7 +319,7 @@ public class CompletedCheckpointTest { retainedLocation, null); - checkpoint.discardOnShutdown(status); + checkpoint.markAsDiscardedOnShutdown(status).discard(); verify(state, times(0)).discardState(); assertFalse(retainedLocation.isDisposed()); @@ -347,7 +347,7 @@ public class CompletedCheckpointTest { discardLocation, null); - checkpoint.discardOnShutdown(status); + checkpoint.markAsDiscardedOnShutdown(status).discard(); verify(state, times(1)).discardState(); assertTrue(discardLocation.isDisposed()); @@ -388,7 +388,7 @@ public class CompletedCheckpointTest { new TestCompletedCheckpointStorageLocation(), checkpointStats); - completed.discardOnShutdown(JobStatus.FINISHED); + completed.markAsDiscardedOnShutdown(JobStatus.FINISHED).discard(); assertTrue(checkpointStats.isDiscarded()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java index 9a584c9..f30df4d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java @@ -110,7 +110,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS new TestCompletedCheckpointStorageLocation(), null) { @Override - public boolean discardOnSubsume() { + public CompletedCheckpointDiscardObject markAsDiscardedOnSubsume() { discardAttempted.countDown(); throw new RuntimeException(); }
