Repository: flink Updated Branches: refs/heads/master d160b5e56 -> 24408e190
[FLINK-5962] [checkpoints] Remove scheduled cancel-task from timer queue to prevent memory leaks This closes #3548 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70252f34 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70252f34 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70252f34 Branch: refs/heads/master Commit: 70252f3468916758e8bc456bbf482549c38ad7ff Parents: afd36f9 Author: Stephan Ewen <[email protected]> Authored: Wed Mar 15 16:44:41 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Thu Mar 16 14:43:26 2017 +0100 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 79 +++++++++++++------- .../runtime/checkpoint/PendingCheckpoint.java | 38 ++++++++++ .../checkpoint/CheckpointCoordinatorTest.java | 16 +++- .../checkpoint/PendingCheckpointTest.java | 18 +++++ 4 files changed, 123 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 0592e3d..cc60837 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.TaskStateHandles; +import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,9 +50,10 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.util.Preconditions.checkArgument; @@ -134,7 +136,7 @@ public class CheckpointCoordinator { private final int maxConcurrentCheckpointAttempts; /** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */ - private final Timer timer; + private final ScheduledThreadPoolExecutor timer; /** Actor that receives status updates from the execution graph this coordinator works for */ private JobStatusListener jobStatusListener; @@ -142,7 +144,8 @@ public class CheckpointCoordinator { /** The number of consecutive failed trigger attempts */ private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0); - private ScheduledTrigger currentPeriodicTrigger; + /** A handle to the current periodic trigger, to cancel it when necessary */ + private ScheduledFuture<?> currentPeriodicTrigger; /** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed */ private long lastCheckpointCompletionNanos; @@ -218,7 +221,13 @@ public class CheckpointCoordinator { this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS); - this.timer = new Timer("Checkpoint Timer", true); + this.timer = new ScheduledThreadPoolExecutor(1, + new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer")); + + // make sure the timer internally cleans up and does not hold onto stale scheduled tasks + this.timer.setRemoveOnCancelPolicy(true); + this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); if (externalizeSettings.externalizeCheckpoints()) { LOG.info("Persisting periodic checkpoints externally at {}.", checkpointDirectory); @@ -265,7 +274,7 @@ public class CheckpointCoordinator { triggerRequestQueued = false; // shut down the thread that handles the timeouts and pending triggers - timer.cancel(); + timer.shutdownNow(); // clear and discard all pending checkpoints for (PendingCheckpoint pending : pendingCheckpoints.values()) { @@ -392,7 +401,7 @@ public class CheckpointCoordinator { if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { triggerRequestQueued = true; if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); + currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); @@ -404,13 +413,14 @@ public class CheckpointCoordinator { if (durationTillNextMillis > 0) { if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); + currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } - ScheduledTrigger trigger = new ScheduledTrigger(); // Reassign the new trigger to the currentPeriodicTrigger - currentPeriodicTrigger = trigger; - timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval); + currentPeriodicTrigger = timer.scheduleAtFixedRate( + new ScheduledTrigger(), + durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); + return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } } @@ -483,7 +493,7 @@ public class CheckpointCoordinator { } // schedule the timer that will clean up the expired checkpoints - TimerTask canceller = new TimerTask() { + final Runnable canceller = new Runnable() { @Override public void run() { synchronized (lock) { @@ -519,7 +529,7 @@ public class CheckpointCoordinator { if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { triggerRequestQueued = true; if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); + currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); @@ -531,14 +541,15 @@ public class CheckpointCoordinator { if (durationTillNextMillis > 0) { if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); + currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } - ScheduledTrigger trigger = new ScheduledTrigger(); // Reassign the new trigger to the currentPeriodicTrigger - currentPeriodicTrigger = trigger; - timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval); + currentPeriodicTrigger = timer.scheduleAtFixedRate( + new ScheduledTrigger(), + durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); + return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } } @@ -546,7 +557,15 @@ public class CheckpointCoordinator { LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp); pendingCheckpoints.put(checkpointID, checkpoint); - timer.schedule(canceller, checkpointTimeout); + + ScheduledFuture<?> cancellerHandle = timer.schedule( + canceller, + checkpointTimeout, TimeUnit.MILLISECONDS); + + if (!checkpoint.setCancellerHandle(cancellerHandle)) { + // checkpoint is already disposed! + cancellerHandle.cancel(false); + } } // end of lock scope @@ -866,20 +885,25 @@ public class CheckpointCoordinator { // trigger the checkpoint from the trigger timer, to finish the work of this thread before // starting with the next checkpoint - ScheduledTrigger trigger = new ScheduledTrigger(); if (periodicScheduling) { if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); + currentPeriodicTrigger.cancel(false); } - currentPeriodicTrigger = trigger; - timer.scheduleAtFixedRate(trigger, 0L, baseInterval); + currentPeriodicTrigger = timer.scheduleAtFixedRate( + new ScheduledTrigger(), + 0L, baseInterval, TimeUnit.MILLISECONDS); } else { - timer.schedule(trigger, 0L); + timer.execute(new ScheduledTrigger()); } } } + @VisibleForTesting + int getNumScheduledTasks() { + return timer.getQueue().size(); + } + // -------------------------------------------------------------------------------------------- // Checkpoint State Restoring // -------------------------------------------------------------------------------------------- @@ -1006,8 +1030,9 @@ public class CheckpointCoordinator { stopCheckpointScheduler(); periodicScheduling = true; - currentPeriodicTrigger = new ScheduledTrigger(); - timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval); + currentPeriodicTrigger = timer.scheduleAtFixedRate( + new ScheduledTrigger(), + baseInterval, baseInterval, TimeUnit.MILLISECONDS); } } @@ -1017,7 +1042,7 @@ public class CheckpointCoordinator { periodicScheduling = false; if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); + currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } @@ -1050,7 +1075,7 @@ public class CheckpointCoordinator { // ------------------------------------------------------------------------ - private class ScheduledTrigger extends TimerTask { + private final class ScheduledTrigger implements Runnable { @Override public void run() { http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 5ca6040..b7eb037 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -46,6 +46,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -99,6 +100,8 @@ public class PendingCheckpoint { @Nullable private PendingCheckpointStats statsCallback; + private volatile ScheduledFuture<?> cancellerHandle; + // -------------------------------------------------------------------------------------------- public PendingCheckpoint( @@ -197,6 +200,27 @@ public class PendingCheckpoint { this.statsCallback = trackerCallback; } + /** + * Sets the handle for the canceller to this pending checkoint. + * + * @return true, if the handle was set, false, if the checkpoint is already disposed; + */ + public boolean setCancellerHandle(ScheduledFuture<?> cancellerHandle) { + synchronized (lock) { + if (this.cancellerHandle == null) { + if (!discarded) { + this.cancellerHandle = cancellerHandle; + return true; + } else { + return false; + } + } + else { + throw new IllegalStateException("A canceller handle was already set"); + } + } + } + // ------------------------------------------------------------------------ // Progress and Completion // ------------------------------------------------------------------------ @@ -490,10 +514,24 @@ public class PendingCheckpoint { discarded = true; notYetAcknowledgedTasks.clear(); acknowledgedTasks.clear(); + cancelCanceller(); } } } + private void cancelCanceller() { + try { + final ScheduledFuture<?> canceller = this.cancellerHandle; + if (canceller != null) { + canceller.cancel(false); + } + } + catch (Exception e) { + // this code should not throw exceptions + LOG.warn("Error while cancelling checkpoint timeout task", e); + } + } + /** * Reports a failed checkpoint with the given optional cause. * http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 1691370..d8bba59 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -299,6 +299,9 @@ public class CheckpointCoordinatorTest { assertEquals(1, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + // we have one task scheduled that will cancel after timeout + assertEquals(1, coord.getNumScheduledTasks()); + long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId); @@ -336,6 +339,9 @@ public class CheckpointCoordinatorTest { coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId)); assertTrue(checkpoint.isDiscarded()); + // the canceler is also removed + assertEquals(0, coord.getNumScheduledTasks()); + // validate that we have no new pending checkpoint assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -389,6 +395,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(0, coord.getNumScheduledTasks()); // trigger the first checkpoint. this should succeed assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -399,6 +406,7 @@ public class CheckpointCoordinatorTest { // validate that we have a pending checkpoint assertEquals(2, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(2, coord.getNumScheduledTasks()); Iterator<Map.Entry<Long, PendingCheckpoint>> it = coord.getPendingCheckpoints().entrySet().iterator(); long checkpoint1Id = it.next().getKey(); @@ -439,13 +447,13 @@ public class CheckpointCoordinatorTest { } // decline checkpoint from one of the tasks, this should cancel the checkpoint - // and trigger a new one coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id)); assertTrue(checkpoint1.isDiscarded()); // validate that we have only one pending checkpoint left assertEquals(1, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(1, coord.getNumScheduledTasks()); // validate that it is the same second checkpoint from earlier long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); @@ -506,6 +514,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(0, coord.getNumScheduledTasks()); // trigger the first checkpoint. this should succeed assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -513,6 +522,7 @@ public class CheckpointCoordinatorTest { // validate that we have a pending checkpoint assertEquals(1, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(1, coord.getNumScheduledTasks()); long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId); @@ -556,6 +566,9 @@ public class CheckpointCoordinatorTest { assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); + // the canceler should be removed now + assertEquals(0, coord.getNumScheduledTasks()); + // validate that the relevant tasks got a confirmation message { verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class)); @@ -580,6 +593,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(0, coord.getNumScheduledTasks()); CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0); assertEquals(jid, successNew.getJobId()); http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 6f04f39..55b5fe0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -38,6 +38,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -287,6 +288,23 @@ public class PendingCheckpointTest { } } + @Test + public void testSetCanceller() { + final CheckpointProperties props = new CheckpointProperties(false, false, true, true, true, true, true); + + PendingCheckpoint aborted = createPendingCheckpoint(props, null); + aborted.abortDeclined(); + assertTrue(aborted.isDiscarded()); + assertFalse(aborted.setCancellerHandle(mock(ScheduledFuture.class))); + + PendingCheckpoint pending = createPendingCheckpoint(props, null); + ScheduledFuture<?> canceller = mock(ScheduledFuture.class); + + assertTrue(pending.setCancellerHandle(canceller)); + pending.abortDeclined(); + verify(canceller).cancel(false); + } + // ------------------------------------------------------------------------ private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {
