rkhachatryan commented on a change in pull request #13040: URL: https://github.com/apache/flink/pull/13040#discussion_r491905895
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.StateUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Delegate class responsible for checkpoints cleaning and counting the number of checkpoints yet + * to clean. + */ +public class CheckpointsCleaner implements Serializable{ + private final AtomicInteger numberOfCheckpointsToClean; + private static final Logger LOG = LoggerFactory.getLogger(CheckpointsCleaner.class); + + public CheckpointsCleaner() { + this.numberOfCheckpointsToClean = new AtomicInteger(0); + } + + int getNumberOfCheckpointsToClean() { + return numberOfCheckpointsToClean.get(); + } + + public void cleanCheckpoint(Runnable cleanAction, Runnable postCleanAction, Executor executor) { + numberOfCheckpointsToClean.incrementAndGet(); + executor.execute(() -> { + try { + cleanAction.run(); + } finally { + numberOfCheckpointsToClean.decrementAndGet(); + postCleanAction.run(); + } + }); + } + + public void cleanStates(Runnable postCleanAction, Map<OperatorID, OperatorState> operatorStates, PendingCheckpoint pendingCheckpoint, CheckpointStorageLocation targetLocation, Executor executor){ + numberOfCheckpointsToClean.incrementAndGet(); + executor.execute(() -> { + // discard the private states. + // unregistered shared states are still considered private at this point. + try { + StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values()); + targetLocation.disposeOnFailure(); Review comment: I think this logic should belong to the `PendingCheckpoint` (as well as `LOG.warn` and `map.clear`) as it accesses internals of the `PendingCheckpoint`. In the original version, this was the case. And if we use that original Runnable, we can even get rid of `cleanStates` and have only `cleanCheckpoint`. WDYT? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ########## @@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws Exception { recoveredTestCheckpoint.awaitDiscard(); } + /** + * FLINK-17073 tests that there is no request triggered when there are too many checkpoints + * waiting to clean and that it resumes when the number of waiting checkpoints as gone below + * the threshold. + * + */ + @Test + public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() throws Exception{ + ManualClock clock = new ManualClock(); + clock.advanceTime(1, TimeUnit.DAYS); + int maxCleaningCheckpoints = 1; + CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner(); + CheckpointRequestDecider checkpointRequestDecider = new CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean); + + final int maxCheckpointsToRetain = 1; + Executors.PausableThreadPoolExecutor executor = Executors.pausableExecutor(); Review comment: I think `ManuallyTriggeredScheduledExecutor` would suffice here. Did you consider using it? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ########## @@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws Exception { recoveredTestCheckpoint.awaitDiscard(); } + /** + * FLINK-17073 tests that there is no request triggered when there are too many checkpoints + * waiting to clean and that it resumes when the number of waiting checkpoints as gone below + * the threshold. + * + */ + @Test + public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() throws Exception{ + ManualClock clock = new ManualClock(); + clock.advanceTime(1, TimeUnit.DAYS); + int maxCleaningCheckpoints = 1; + CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner(); + CheckpointRequestDecider checkpointRequestDecider = new CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean); + + final int maxCheckpointsToRetain = 1; + Executors.PausableThreadPoolExecutor executor = Executors.pausableExecutor(); + ZooKeeperCompletedCheckpointStore checkpointStore = createCompletedCheckpoints(maxCheckpointsToRetain, executor); + + //pause the executor to pause checkpoints cleaning, to allow assertions + executor.pause(); + + int nbCheckpointsToInject = 3; + for (int i = 1; i <= nbCheckpointsToInject; i++) { + // add checkpoints to clean + TestCompletedCheckpoint completedCheckpoint = new TestCompletedCheckpoint(new JobID(), i, + i, Collections.emptyMap(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE), + checkpointsCleaner::cleanCheckpoint); + checkpointStore.addCheckpoint(completedCheckpoint); + } + + Thread.sleep(100L); // give time to submit checkpoints for cleaning + + int nbCheckpointsSubmittedForCleaningByCheckpointStore = nbCheckpointsToInject - maxCheckpointsToRetain; + assertEquals(nbCheckpointsSubmittedForCleaningByCheckpointStore, checkpointsCleaner.getNumberOfCheckpointsToClean()); Review comment: This `Thread.sleep` can be fragile. How about replacing it with a loop: ``` while (checkpointsCleaner.getNumberOfCheckpointsToClean() < nbCheckpointsToInject - maxCheckpointsToRetain) { Thread.sleep(10); } ``` ? (ditto: the next `sleep`) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ########## @@ -108,6 +110,11 @@ @Nullable private transient volatile CompletedCheckpointStats.DiscardCallback discardCallback; + private final CheckpointsCleaningRunner cleanCallback; + + private final SerializableRunnable checkpointCleaningFinishedCallback; Review comment: Can we use uniform naming here, like `cleanupCallback` and `cleanupFinishedCallback`? (ditto: `PendingCheckpoint`) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ########## @@ -210,6 +221,21 @@ public void registerSharedStatesAfterRestored(SharedStateRegistry sharedStateReg // Discard and Dispose // ------------------------------------------------------------------------ + /** + * Asynchronously call a discard on the ioExecutor + * (FixedThreadPool of configurable size of default 4*CPU cores) + * and count the number of checkpoints that are waiting to clean. + */ + void asyncDiscardCheckpointAndCountCheckpoint(ThrowingConsumer<CompletedCheckpoint, Exception> discardCallback, Executor executor){ + cleanCallback.accept(() -> { + try { + discardCallback.accept(this); Review comment: This callback calling callback and `callback.accept(this)` doesn't look very readable to me. Looking at `discardCallback`, I see: 1. it's a call to `CompletedCheckpoint.doDiscard` if some condition is `true` 1. the condition can be evaluated early Therefore, we can get rid of it by: 1. evaluating condition ASAP in `ZooKeeperCompletedCheckpointStore` (maybe extract methods `CompletedCheckpoint.shouldDiscardOnShutdown` and `OnSubsume`) 1. passing `boolean discard` to `tryRemoveCompletedCheckpoint` and further instead of callback 1. replacing `discardCallback.accept(this);` with `if (discard) this.doDiscard();`. in `CompletedCheckpoint.asyncDiscardCheckpointAndCountCheckpoint`: WDYT? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ########## @@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws Exception { recoveredTestCheckpoint.awaitDiscard(); } + /** + * FLINK-17073 tests that there is no request triggered when there are too many checkpoints + * waiting to clean and that it resumes when the number of waiting checkpoints as gone below + * the threshold. + * + */ + @Test + public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() throws Exception{ + ManualClock clock = new ManualClock(); + clock.advanceTime(1, TimeUnit.DAYS); + int maxCleaningCheckpoints = 1; + CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner(); + CheckpointRequestDecider checkpointRequestDecider = new CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean); + + final int maxCheckpointsToRetain = 1; + Executors.PausableThreadPoolExecutor executor = Executors.pausableExecutor(); + ZooKeeperCompletedCheckpointStore checkpointStore = createCompletedCheckpoints(maxCheckpointsToRetain, executor); + + //pause the executor to pause checkpoints cleaning, to allow assertions + executor.pause(); + + int nbCheckpointsToInject = 3; + for (int i = 1; i <= nbCheckpointsToInject; i++) { + // add checkpoints to clean + TestCompletedCheckpoint completedCheckpoint = new TestCompletedCheckpoint(new JobID(), i, + i, Collections.emptyMap(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE), + checkpointsCleaner::cleanCheckpoint); + checkpointStore.addCheckpoint(completedCheckpoint); + } + + Thread.sleep(100L); // give time to submit checkpoints for cleaning + + int nbCheckpointsSubmittedForCleaningByCheckpointStore = nbCheckpointsToInject - maxCheckpointsToRetain; + assertEquals(nbCheckpointsSubmittedForCleaningByCheckpointStore, checkpointsCleaner.getNumberOfCheckpointsToClean()); Review comment: An infinite loop will still fail the test with a timeout (this is not ideal, but I don't see any better way to do this). ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ########## @@ -210,6 +221,21 @@ public void registerSharedStatesAfterRestored(SharedStateRegistry sharedStateReg // Discard and Dispose // ------------------------------------------------------------------------ + /** + * Asynchronously call a discard on the ioExecutor + * (FixedThreadPool of configurable size of default 4*CPU cores) + * and count the number of checkpoints that are waiting to clean. + */ + void asyncDiscardCheckpointAndCountCheckpoint(ThrowingConsumer<CompletedCheckpoint, Exception> discardCallback, Executor executor){ + cleanCallback.accept(() -> { + try { + discardCallback.accept(this); Review comment: Sure, we shouldn't break the encapsulation of `CompletedCheckpoint`, but we could extract these conditions into public methods (`boolean shouldDiscardOnXxx()`) which would be used by ZooKeeperStore. If you think it doesn't worth it I'm also fine with the current approach. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
