Repository: flink
Updated Branches:
  refs/heads/release-1.2 9206df666 -> 9d59e008d


[FLINK-5962] [checkpoints] Remove scheduled cancel-task from timer queue to 
prevent memory leaks

This closes #3548


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d59e008
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d59e008
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d59e008

Branch: refs/heads/release-1.2
Commit: 9d59e008d8849cdfe2daf302e251454435bb997f
Parents: 9206df6
Author: Stephan Ewen <[email protected]>
Authored: Wed Mar 15 16:44:41 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Mar 15 20:28:15 2017 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 82 +++++++++++++-------
 .../runtime/checkpoint/PendingCheckpoint.java   | 38 +++++++++
 .../checkpoint/CheckpointCoordinatorTest.java   | 16 +++-
 .../checkpoint/PendingCheckpointTest.java       | 20 ++++-
 4 files changed, 126 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9d59e008/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 78cad91..cb8417a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -36,6 +36,8 @@ import 
org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,9 +48,10 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -123,7 +126,7 @@ public class CheckpointCoordinator {
        private final int maxConcurrentCheckpointAttempts;
 
        /** The timer that handles the checkpoint timeouts and triggers 
periodic checkpoints */
-       private final Timer timer;
+       private final ScheduledThreadPoolExecutor timer;
 
        /** Actor that receives status updates from the execution graph this 
coordinator works for */
        private JobStatusListener jobStatusListener;
@@ -131,7 +134,8 @@ public class CheckpointCoordinator {
        /** The number of consecutive failed trigger attempts */
        private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new 
AtomicInteger(0);
 
-       private ScheduledTrigger currentPeriodicTrigger;
+       /** A handle to the current periodic trigger, to cancel it when 
necessary */
+       private ScheduledFuture<?> currentPeriodicTrigger;
 
        /** The timestamp (via {@link System#nanoTime()}) when the last 
checkpoint completed */
        private long lastCheckpointCompletionNanos;
@@ -210,7 +214,13 @@ public class CheckpointCoordinator {
                this.checkpointDirectory = checkpointDirectory;
                this.recentPendingCheckpoints = new 
ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 
-               this.timer = new Timer("Checkpoint Timer", true);
+               this.timer = new ScheduledThreadPoolExecutor(1,
+                               new 
DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint 
Timer"));
+
+               // make sure the timer internally cleans up and does not hold 
onto stale scheduled tasks
+               this.timer.setRemoveOnCancelPolicy(true);
+               
this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+               
this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
 
                if (externalizeSettings.externalizeCheckpoints()) {
                        LOG.info("Persisting periodic checkpoints externally at 
{}.", checkpointDirectory);
@@ -259,7 +269,7 @@ public class CheckpointCoordinator {
                                triggerRequestQueued = false;
 
                                // shut down the thread that handles the 
timeouts and pending triggers
-                               timer.cancel();
+                               timer.shutdownNow();
 
                                // clear and discard all pending checkpoints
                                for (PendingCheckpoint pending : 
pendingCheckpoints.values()) {
@@ -359,7 +369,7 @@ public class CheckpointCoordinator {
                                if (pendingCheckpoints.size() >= 
maxConcurrentCheckpointAttempts) {
                                        triggerRequestQueued = true;
                                        if (currentPeriodicTrigger != null) {
-                                               currentPeriodicTrigger.cancel();
+                                               
currentPeriodicTrigger.cancel(false);
                                                currentPeriodicTrigger = null;
                                        }
                                        return new 
CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
@@ -371,13 +381,14 @@ public class CheckpointCoordinator {
 
                                if (durationTillNextMillis > 0) {
                                        if (currentPeriodicTrigger != null) {
-                                               currentPeriodicTrigger.cancel();
+                                               
currentPeriodicTrigger.cancel(false);
                                                currentPeriodicTrigger = null;
                                        }
-                                       ScheduledTrigger trigger = new 
ScheduledTrigger();
                                        // Reassign the new trigger to the 
currentPeriodicTrigger
-                                       currentPeriodicTrigger = trigger;
-                                       timer.scheduleAtFixedRate(trigger, 
durationTillNextMillis, baseInterval);
+                                       currentPeriodicTrigger = 
timer.scheduleAtFixedRate(
+                                                       new ScheduledTrigger(),
+                                                       durationTillNextMillis, 
baseInterval, TimeUnit.MILLISECONDS);
+
                                        return new 
CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                                }
                        }
@@ -450,7 +461,7 @@ public class CheckpointCoordinator {
                        }
 
                        // schedule the timer that will clean up the expired 
checkpoints
-                       TimerTask canceller = new TimerTask() {
+                       final Runnable canceller = new Runnable() {
                                @Override
                                public void run() {
                                        synchronized (lock) {
@@ -486,7 +497,7 @@ public class CheckpointCoordinator {
                                                if (pendingCheckpoints.size() 
>= maxConcurrentCheckpointAttempts) {
                                                        triggerRequestQueued = 
true;
                                                        if 
(currentPeriodicTrigger != null) {
-                                                               
currentPeriodicTrigger.cancel();
+                                                               
currentPeriodicTrigger.cancel(false);
                                                                
currentPeriodicTrigger = null;
                                                        }
                                                        return new 
CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
@@ -498,14 +509,15 @@ public class CheckpointCoordinator {
 
                                                if (durationTillNextMillis > 0) 
{
                                                        if 
(currentPeriodicTrigger != null) {
-                                                               
currentPeriodicTrigger.cancel();
+                                                               
currentPeriodicTrigger.cancel(false);
                                                                
currentPeriodicTrigger = null;
                                                        }
 
-                                                       ScheduledTrigger 
trigger = new ScheduledTrigger();
                                                        // Reassign the new 
trigger to the currentPeriodicTrigger
-                                                       currentPeriodicTrigger 
= trigger;
-                                                       
timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
+                                                       currentPeriodicTrigger 
= timer.scheduleAtFixedRate(
+                                                                       new 
ScheduledTrigger(),
+                                                                       
durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
+
                                                        return new 
CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                                                }
                                        }
@@ -513,7 +525,15 @@ public class CheckpointCoordinator {
                                        LOG.info("Triggering checkpoint " + 
checkpointID + " @ " + timestamp);
 
                                        pendingCheckpoints.put(checkpointID, 
checkpoint);
-                                       timer.schedule(canceller, 
checkpointTimeout);
+
+                                       ScheduledFuture<?> cancellerHandle = 
timer.schedule(
+                                                       canceller,
+                                                       checkpointTimeout, 
TimeUnit.MILLISECONDS);
+
+                                       if 
(!checkpoint.setCancellerHandle(cancellerHandle)) {
+                                               // checkpoint is already 
disposed!
+                                               cancellerHandle.cancel(false);
+                                       }
                                }
                                // end of lock scope
 
@@ -623,7 +643,7 @@ public class CheckpointCoordinator {
         * @return Flag indicating whether the ack'd checkpoint was associated
         * with a pending checkpoint.
         *
-        * @throws Exception If the checkpoint cannot be added to the completed 
checkpoint store.
+        * @throws CheckpointException If the checkpoint cannot be added to the 
completed checkpoint store.
         */
        public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) 
throws CheckpointException {
                if (shutdown || message == null) {
@@ -819,20 +839,25 @@ public class CheckpointCoordinator {
 
                        // trigger the checkpoint from the trigger timer, to 
finish the work of this thread before
                        // starting with the next checkpoint
-                       ScheduledTrigger trigger = new ScheduledTrigger();
                        if (periodicScheduling) {
                                if (currentPeriodicTrigger != null) {
-                                       currentPeriodicTrigger.cancel();
+                                       currentPeriodicTrigger.cancel(false);
                                }
-                               currentPeriodicTrigger = trigger;
-                               timer.scheduleAtFixedRate(trigger, 0L, 
baseInterval);
+                               currentPeriodicTrigger = 
timer.scheduleAtFixedRate(
+                                               new ScheduledTrigger(),
+                                               0L, baseInterval, 
TimeUnit.MILLISECONDS);
                        }
                        else {
-                               timer.schedule(trigger, 0L);
+                               timer.execute(new ScheduledTrigger());
                        }
                }
        }
 
+       @VisibleForTesting
+       int getNumScheduledTasks() {
+               return timer.getQueue().size();
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Checkpoint State Restoring
        // 
--------------------------------------------------------------------------------------------
@@ -959,8 +984,9 @@ public class CheckpointCoordinator {
                        stopCheckpointScheduler();
 
                        periodicScheduling = true;
-                       currentPeriodicTrigger = new ScheduledTrigger();
-                       timer.scheduleAtFixedRate(currentPeriodicTrigger, 
baseInterval, baseInterval);
+                       currentPeriodicTrigger = timer.scheduleAtFixedRate(
+                                       new ScheduledTrigger(), 
+                                       baseInterval, baseInterval, 
TimeUnit.MILLISECONDS);
                }
        }
 
@@ -970,7 +996,7 @@ public class CheckpointCoordinator {
                        periodicScheduling = false;
 
                        if (currentPeriodicTrigger != null) {
-                               currentPeriodicTrigger.cancel();
+                               currentPeriodicTrigger.cancel(false);
                                currentPeriodicTrigger = null;
                        }
 
@@ -1003,7 +1029,7 @@ public class CheckpointCoordinator {
 
        // 
------------------------------------------------------------------------
 
-       private class ScheduledTrigger extends TimerTask {
+       private final class ScheduledTrigger implements Runnable {
 
                @Override
                public void run() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9d59e008/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1531f0f..9859e01 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -42,6 +42,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -95,6 +96,8 @@ public class PendingCheckpoint {
        @Nullable
        private PendingCheckpointStats statsCallback;
 
+       private volatile ScheduledFuture<?> cancellerHandle;
+
        // 
--------------------------------------------------------------------------------------------
 
        public PendingCheckpoint(
@@ -191,6 +194,27 @@ public class PendingCheckpoint {
                this.statsCallback = checkNotNull(trackerCallback);
        }
 
+       /**
+        * Sets the handle for the canceller to this pending checkoint.
+        * 
+        * @return true, if the handle was set, false, if the checkpoint is 
already disposed;
+        */
+       public boolean setCancellerHandle(ScheduledFuture<?> cancellerHandle) {
+               synchronized (lock) {
+                       if (this.cancellerHandle == null) {
+                               if (!discarded) {
+                                       this.cancellerHandle = cancellerHandle;
+                                       return true;
+                               } else {
+                                       return false;
+                               }
+                       }
+                       else {
+                               throw new IllegalStateException("A canceller 
handle was already set");
+                       }
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Progress and Completion
        // 
------------------------------------------------------------------------
@@ -427,10 +451,24 @@ public class PendingCheckpoint {
                                discarded = true;
                                notYetAcknowledgedTasks.clear();
                                acknowledgedTasks.clear();
+                               cancelCanceller();
                        }
                }
        }
 
+       private void cancelCanceller() {
+               try {
+                       final ScheduledFuture<?> canceller = 
this.cancellerHandle;
+                       if (canceller != null) {
+                               canceller.cancel(false);
+                       }
+               }
+               catch (Exception e) {
+                       // this code should not throw exceptions
+                       LOG.warn("Error while cancelling checkpoint timeout 
task", e);
+               }
+       }
+
        /**
         * Reports a failed checkpoint with the given optional cause.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/9d59e008/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 6ba557b..eeab445 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -299,6 +299,9 @@ public class CheckpointCoordinatorTest {
                        assertEquals(1, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
+                       // we have one task scheduled that will cancel after 
timeout
+                       assertEquals(1, coord.getNumScheduledTasks());
+
                        long checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
                        PendingCheckpoint checkpoint = 
coord.getPendingCheckpoints().get(checkpointId);
 
@@ -336,6 +339,9 @@ public class CheckpointCoordinatorTest {
                        coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId));
                        assertTrue(checkpoint.isDiscarded());
 
+                       // the canceler is also removed
+                       assertEquals(0, coord.getNumScheduledTasks());
+
                        // validate that we have no new pending checkpoint
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -389,6 +395,7 @@ public class CheckpointCoordinatorTest {
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(0, coord.getNumScheduledTasks());
 
                        // trigger the first checkpoint. this should succeed
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -399,6 +406,7 @@ public class CheckpointCoordinatorTest {
                        // validate that we have a pending checkpoint
                        assertEquals(2, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(2, coord.getNumScheduledTasks());
 
                        Iterator<Map.Entry<Long, PendingCheckpoint>> it = 
coord.getPendingCheckpoints().entrySet().iterator();
                        long checkpoint1Id = it.next().getKey();
@@ -439,13 +447,13 @@ public class CheckpointCoordinatorTest {
                        }
 
                        // decline checkpoint from one of the tasks, this 
should cancel the checkpoint
-                       // and trigger a new one
                        coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id));
                        assertTrue(checkpoint1.isDiscarded());
 
                        // validate that we have only one pending checkpoint 
left
                        assertEquals(1, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(1, coord.getNumScheduledTasks());
 
                        // validate that it is the same second checkpoint from 
earlier
                        long checkpointIdNew = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
@@ -506,6 +514,7 @@ public class CheckpointCoordinatorTest {
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(0, coord.getNumScheduledTasks());
 
                        // trigger the first checkpoint. this should succeed
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -513,6 +522,7 @@ public class CheckpointCoordinatorTest {
                        // validate that we have a pending checkpoint
                        assertEquals(1, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(1, coord.getNumScheduledTasks());
 
                        long checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
                        PendingCheckpoint checkpoint = 
coord.getPendingCheckpoints().get(checkpointId);
@@ -558,6 +568,9 @@ public class CheckpointCoordinatorTest {
                        assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
 
+                       // the canceler should be removed now
+                       assertEquals(0, coord.getNumScheduledTasks());
+
                        // validate that the relevant tasks got a confirmation 
message
                        {
                                verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
@@ -583,6 +596,7 @@ public class CheckpointCoordinatorTest {
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(0, coord.getNumScheduledTasks());
 
                        CompletedCheckpoint successNew = 
coord.getSuccessfulCheckpoints().get(0);
                        assertEquals(jid, successNew.getJobId());

http://git-wip-us.apache.org/repos/asf/flink/blob/9d59e008/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 4358526..cee7dd5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -36,6 +37,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -44,7 +46,6 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
@@ -279,6 +280,23 @@ public class PendingCheckpointTest {
                }
        }
 
+       @Test
+       public void testSetCanceller() {
+               final CheckpointProperties props = new 
CheckpointProperties(false, false, true, true, true, true, true);
+
+               PendingCheckpoint aborted = createPendingCheckpoint(props, 
null);
+               aborted.abortDeclined();
+               assertTrue(aborted.isDiscarded());
+               
assertFalse(aborted.setCancellerHandle(mock(ScheduledFuture.class)));
+
+               PendingCheckpoint pending = createPendingCheckpoint(props, 
null);
+               ScheduledFuture<?> canceller = mock(ScheduledFuture.class);
+
+               assertTrue(pending.setCancellerHandle(canceller));
+               pending.abortDeclined();
+               verify(canceller).cancel(false);
+       }
+
        // 
------------------------------------------------------------------------
 
        private static PendingCheckpoint 
createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {

Reply via email to