Repository: flink Updated Branches: refs/heads/release-1.2 9206df666 -> 9d59e008d
[FLINK-5962] [checkpoints] Remove scheduled cancel-task from timer queue to prevent memory leaks This closes #3548 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d59e008 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d59e008 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d59e008 Branch: refs/heads/release-1.2 Commit: 9d59e008d8849cdfe2daf302e251454435bb997f Parents: 9206df6 Author: Stephan Ewen <[email protected]> Authored: Wed Mar 15 16:44:41 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Mar 15 20:28:15 2017 +0100 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 82 +++++++++++++------- .../runtime/checkpoint/PendingCheckpoint.java | 38 +++++++++ .../checkpoint/CheckpointCoordinatorTest.java | 16 +++- .../checkpoint/PendingCheckpointTest.java | 20 ++++- 4 files changed, 126 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9d59e008/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 78cad91..cb8417a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -36,6 +36,8 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.TaskStateHandles; +import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,9 +48,10 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.util.Preconditions.checkArgument; @@ -123,7 +126,7 @@ public class CheckpointCoordinator { private final int maxConcurrentCheckpointAttempts; /** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */ - private final Timer timer; + private final ScheduledThreadPoolExecutor timer; /** Actor that receives status updates from the execution graph this coordinator works for */ private JobStatusListener jobStatusListener; @@ -131,7 +134,8 @@ public class CheckpointCoordinator { /** The number of consecutive failed trigger attempts */ private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0); - private ScheduledTrigger currentPeriodicTrigger; + /** A handle to the current periodic trigger, to cancel it when necessary */ + private ScheduledFuture<?> currentPeriodicTrigger; /** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed */ private long lastCheckpointCompletionNanos; @@ -210,7 +214,13 @@ public class CheckpointCoordinator { this.checkpointDirectory = checkpointDirectory; this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS); - this.timer = new Timer("Checkpoint Timer", true); + this.timer = new ScheduledThreadPoolExecutor(1, + new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer")); + + // make sure the timer internally cleans up and does not hold onto stale scheduled tasks + this.timer.setRemoveOnCancelPolicy(true); + this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); if (externalizeSettings.externalizeCheckpoints()) { LOG.info("Persisting periodic checkpoints externally at {}.", checkpointDirectory); @@ -259,7 +269,7 @@ public class CheckpointCoordinator { triggerRequestQueued = false; // shut down the thread that handles the timeouts and pending triggers - timer.cancel(); + timer.shutdownNow(); // clear and discard all pending checkpoints for (PendingCheckpoint pending : pendingCheckpoints.values()) { @@ -359,7 +369,7 @@ public class CheckpointCoordinator { if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { triggerRequestQueued = true; if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); + currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); @@ -371,13 +381,14 @@ public class CheckpointCoordinator { if (durationTillNextMillis > 0) { if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); + currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } - ScheduledTrigger trigger = new ScheduledTrigger(); // Reassign the new trigger to the currentPeriodicTrigger - currentPeriodicTrigger = trigger; - timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval); + currentPeriodicTrigger = timer.scheduleAtFixedRate( + new ScheduledTrigger(), + durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); + return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } } @@ -450,7 +461,7 @@ public class CheckpointCoordinator { } // schedule the timer that will clean up the expired checkpoints - TimerTask canceller = new TimerTask() { + final Runnable canceller = new Runnable() { @Override public void run() { synchronized (lock) { @@ -486,7 +497,7 @@ public class CheckpointCoordinator { if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { triggerRequestQueued = true; if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); + currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); @@ -498,14 +509,15 @@ public class CheckpointCoordinator { if (durationTillNextMillis > 0) { if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); + currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } - ScheduledTrigger trigger = new ScheduledTrigger(); // Reassign the new trigger to the currentPeriodicTrigger - currentPeriodicTrigger = trigger; - timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval); + currentPeriodicTrigger = timer.scheduleAtFixedRate( + new ScheduledTrigger(), + durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); + return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } } @@ -513,7 +525,15 @@ public class CheckpointCoordinator { LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp); pendingCheckpoints.put(checkpointID, checkpoint); - timer.schedule(canceller, checkpointTimeout); + + ScheduledFuture<?> cancellerHandle = timer.schedule( + canceller, + checkpointTimeout, TimeUnit.MILLISECONDS); + + if (!checkpoint.setCancellerHandle(cancellerHandle)) { + // checkpoint is already disposed! + cancellerHandle.cancel(false); + } } // end of lock scope @@ -623,7 +643,7 @@ public class CheckpointCoordinator { * @return Flag indicating whether the ack'd checkpoint was associated * with a pending checkpoint. * - * @throws Exception If the checkpoint cannot be added to the completed checkpoint store. + * @throws CheckpointException If the checkpoint cannot be added to the completed checkpoint store. */ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException { if (shutdown || message == null) { @@ -819,20 +839,25 @@ public class CheckpointCoordinator { // trigger the checkpoint from the trigger timer, to finish the work of this thread before // starting with the next checkpoint - ScheduledTrigger trigger = new ScheduledTrigger(); if (periodicScheduling) { if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); + currentPeriodicTrigger.cancel(false); } - currentPeriodicTrigger = trigger; - timer.scheduleAtFixedRate(trigger, 0L, baseInterval); + currentPeriodicTrigger = timer.scheduleAtFixedRate( + new ScheduledTrigger(), + 0L, baseInterval, TimeUnit.MILLISECONDS); } else { - timer.schedule(trigger, 0L); + timer.execute(new ScheduledTrigger()); } } } + @VisibleForTesting + int getNumScheduledTasks() { + return timer.getQueue().size(); + } + // -------------------------------------------------------------------------------------------- // Checkpoint State Restoring // -------------------------------------------------------------------------------------------- @@ -959,8 +984,9 @@ public class CheckpointCoordinator { stopCheckpointScheduler(); periodicScheduling = true; - currentPeriodicTrigger = new ScheduledTrigger(); - timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval); + currentPeriodicTrigger = timer.scheduleAtFixedRate( + new ScheduledTrigger(), + baseInterval, baseInterval, TimeUnit.MILLISECONDS); } } @@ -970,7 +996,7 @@ public class CheckpointCoordinator { periodicScheduling = false; if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); + currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } @@ -1003,7 +1029,7 @@ public class CheckpointCoordinator { // ------------------------------------------------------------------------ - private class ScheduledTrigger extends TimerTask { + private final class ScheduledTrigger implements Runnable { @Override public void run() { http://git-wip-us.apache.org/repos/asf/flink/blob/9d59e008/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 1531f0f..9859e01 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -42,6 +42,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -95,6 +96,8 @@ public class PendingCheckpoint { @Nullable private PendingCheckpointStats statsCallback; + private volatile ScheduledFuture<?> cancellerHandle; + // -------------------------------------------------------------------------------------------- public PendingCheckpoint( @@ -191,6 +194,27 @@ public class PendingCheckpoint { this.statsCallback = checkNotNull(trackerCallback); } + /** + * Sets the handle for the canceller to this pending checkoint. + * + * @return true, if the handle was set, false, if the checkpoint is already disposed; + */ + public boolean setCancellerHandle(ScheduledFuture<?> cancellerHandle) { + synchronized (lock) { + if (this.cancellerHandle == null) { + if (!discarded) { + this.cancellerHandle = cancellerHandle; + return true; + } else { + return false; + } + } + else { + throw new IllegalStateException("A canceller handle was already set"); + } + } + } + // ------------------------------------------------------------------------ // Progress and Completion // ------------------------------------------------------------------------ @@ -427,10 +451,24 @@ public class PendingCheckpoint { discarded = true; notYetAcknowledgedTasks.clear(); acknowledgedTasks.clear(); + cancelCanceller(); } } } + private void cancelCanceller() { + try { + final ScheduledFuture<?> canceller = this.cancellerHandle; + if (canceller != null) { + canceller.cancel(false); + } + } + catch (Exception e) { + // this code should not throw exceptions + LOG.warn("Error while cancelling checkpoint timeout task", e); + } + } + /** * Reports a failed checkpoint with the given optional cause. * http://git-wip-us.apache.org/repos/asf/flink/blob/9d59e008/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 6ba557b..eeab445 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -299,6 +299,9 @@ public class CheckpointCoordinatorTest { assertEquals(1, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + // we have one task scheduled that will cancel after timeout + assertEquals(1, coord.getNumScheduledTasks()); + long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId); @@ -336,6 +339,9 @@ public class CheckpointCoordinatorTest { coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId)); assertTrue(checkpoint.isDiscarded()); + // the canceler is also removed + assertEquals(0, coord.getNumScheduledTasks()); + // validate that we have no new pending checkpoint assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -389,6 +395,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(0, coord.getNumScheduledTasks()); // trigger the first checkpoint. this should succeed assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -399,6 +406,7 @@ public class CheckpointCoordinatorTest { // validate that we have a pending checkpoint assertEquals(2, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(2, coord.getNumScheduledTasks()); Iterator<Map.Entry<Long, PendingCheckpoint>> it = coord.getPendingCheckpoints().entrySet().iterator(); long checkpoint1Id = it.next().getKey(); @@ -439,13 +447,13 @@ public class CheckpointCoordinatorTest { } // decline checkpoint from one of the tasks, this should cancel the checkpoint - // and trigger a new one coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id)); assertTrue(checkpoint1.isDiscarded()); // validate that we have only one pending checkpoint left assertEquals(1, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(1, coord.getNumScheduledTasks()); // validate that it is the same second checkpoint from earlier long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); @@ -506,6 +514,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(0, coord.getNumScheduledTasks()); // trigger the first checkpoint. this should succeed assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -513,6 +522,7 @@ public class CheckpointCoordinatorTest { // validate that we have a pending checkpoint assertEquals(1, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(1, coord.getNumScheduledTasks()); long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId); @@ -558,6 +568,9 @@ public class CheckpointCoordinatorTest { assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); + // the canceler should be removed now + assertEquals(0, coord.getNumScheduledTasks()); + // validate that the relevant tasks got a confirmation message { verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp)); @@ -583,6 +596,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(0, coord.getNumScheduledTasks()); CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0); assertEquals(jid, successNew.getJobId()); http://git-wip-us.apache.org/repos/asf/flink/blob/9d59e008/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 4358526..cee7dd5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -36,6 +37,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -44,7 +46,6 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -279,6 +280,23 @@ public class PendingCheckpointTest { } } + @Test + public void testSetCanceller() { + final CheckpointProperties props = new CheckpointProperties(false, false, true, true, true, true, true); + + PendingCheckpoint aborted = createPendingCheckpoint(props, null); + aborted.abortDeclined(); + assertTrue(aborted.isDiscarded()); + assertFalse(aborted.setCancellerHandle(mock(ScheduledFuture.class))); + + PendingCheckpoint pending = createPendingCheckpoint(props, null); + ScheduledFuture<?> canceller = mock(ScheduledFuture.class); + + assertTrue(pending.setCancellerHandle(canceller)); + pending.abortDeclined(); + verify(canceller).cancel(false); + } + // ------------------------------------------------------------------------ private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {
