[FLINK-5085] Execute CheckpointCoordinator's state discard calls asynchronously
The CheckpointCoordinator is now given an Executor which is used to execute the state discard calls asynchronously. This will prevent blocking operations to be executed from within the calling thread. Shut down ExecutorServices gracefully This closes #2825. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c590912c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c590912c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c590912c Branch: refs/heads/master Commit: c590912c93a4059b40452dfa6cffbdd4d58cac13 Parents: 3fb92d8 Author: Till Rohrmann <[email protected]> Authored: Thu Nov 17 15:39:11 2016 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Nov 22 23:00:17 2016 +0100 ---------------------------------------------------------------------- .../MesosApplicationMasterRunner.java | 69 +- .../clusterframework/MesosJobManager.scala | 34 +- .../webmonitor/BackPressureStatsTracker.java | 2 +- .../BackPressureStatsTrackerTest.java | 2 +- .../checkpoint/CheckpointCoordinator.java | 104 ++- .../runtime/checkpoint/PendingCheckpoint.java | 31 +- .../flink/runtime/concurrent/Executors.java | 49 ++ .../runtime/executiongraph/ExecutionGraph.java | 51 +- .../executiongraph/ExecutionGraphBuilder.java | 28 +- .../runtime/executiongraph/ExecutionVertex.java | 4 +- .../restart/FailureRateRestartStrategy.java | 2 +- .../restart/FixedDelayRestartStrategy.java | 2 +- .../ContaineredJobManager.scala | 9 +- .../flink/runtime/jobmanager/JobManager.scala | 30 +- .../runtime/minicluster/FlinkMiniCluster.scala | 11 +- .../minicluster/LocalFlinkMiniCluster.scala | 38 +- .../checkpoint/CheckpointCoordinatorTest.java | 679 ++++++++++--------- .../checkpoint/CheckpointStateRestoreTest.java | 83 +-- ...ExecutionGraphCheckpointCoordinatorTest.java | 1 + .../checkpoint/PendingCheckpointTest.java | 11 +- .../executiongraph/AllVerticesIteratorTest.java | 2 +- .../ArchivedExecutionGraphTest.java | 1 + .../ExecutionGraphConstructionTest.java | 24 +- .../ExecutionGraphDeploymentTest.java | 7 +- .../ExecutionGraphMetricsTest.java | 1 + .../ExecutionGraphRestartTest.java | 17 +- .../ExecutionGraphSignalsTest.java | 1 + .../executiongraph/ExecutionGraphTestUtils.java | 1 + .../ExecutionStateProgressTest.java | 3 +- .../executiongraph/PointwisePatternTest.java | 21 +- .../TerminalStateDeadlockTest.java | 1 + .../executiongraph/VertexSlotSharingTest.java | 15 +- .../restart/FixedDelayRestartStrategyTest.java | 2 +- .../jobmanager/JobManagerHARecoveryTest.java | 38 +- .../JobManagerLeaderElectionTest.java | 29 +- .../TaskManagerLossFailsTasksTest.scala | 1 + .../runtime/testingUtils/TestingCluster.scala | 8 +- .../testingUtils/TestingJobManager.scala | 6 +- .../partitioner/RescalePartitionerTest.java | 1 + .../flink/yarn/TestingYarnJobManager.scala | 9 +- .../flink/yarn/YarnApplicationMasterRunner.java | 17 +- .../org/apache/flink/yarn/YarnJobManager.scala | 9 +- 42 files changed, 834 insertions(+), 620 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 166218f..3695578 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -65,6 +65,7 @@ import scala.concurrent.duration.FiniteDuration; import java.io.File; import java.net.InetAddress; import java.net.URL; +import java.net.UnknownHostException; import java.security.PrivilegedAction; import java.util.Map; import java.util.UUID; @@ -172,43 +173,51 @@ public class MesosApplicationMasterRunner { WebMonitor webMonitor = null; MesosArtifactServer artifactServer = null; - int numberProcessors = Hardware.getNumberCPUCores(); + // ------- (1) load and parse / validate all configurations ------- - final ExecutorService futureExecutor = Executors.newFixedThreadPool( - numberProcessors, - new NamedThreadFactory("mesos-jobmanager-future-", "-thread-")); + // loading all config values here has the advantage that the program fails fast, if any + // configuration problem occurs - final ExecutorService ioExecutor = Executors.newFixedThreadPool( - numberProcessors, - new NamedThreadFactory("mesos-jobmanager-io-", "-thread-")); + final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX); + checkState(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX); + + final String sessionID = ENV.get(MesosConfigKeys.ENV_SESSION_ID); + checkState(sessionID != null, "Session ID (%s) not set", MesosConfigKeys.ENV_SESSION_ID); + + // Note that we use the "appMasterHostname" given by the system, to make sure + // we use the hostnames consistently throughout akka. + // for akka "localhost" and "localhost.localdomain" are different actors. + final String appMasterHostname; try { - // ------- (1) load and parse / validate all configurations ------- + appMasterHostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException uhe) { + LOG.error("Could not retrieve the local hostname.", uhe); - // loading all config values here has the advantage that the program fails fast, if any - // configuration problem occurs + return INIT_ERROR_EXIT_CODE; + } - final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX); - checkState(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX); + // Flink configuration + final Configuration dynamicProperties = + FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES)); + LOG.debug("Mesos dynamic properties: {}", dynamicProperties); - final String sessionID = ENV.get(MesosConfigKeys.ENV_SESSION_ID); - checkState(sessionID != null, "Session ID (%s) not set", MesosConfigKeys.ENV_SESSION_ID); + final Configuration config = createConfiguration(workingDir, dynamicProperties); - // Note that we use the "appMasterHostname" given by the system, to make sure - // we use the hostnames consistently throughout akka. - // for akka "localhost" and "localhost.localdomain" are different actors. - final String appMasterHostname = InetAddress.getLocalHost().getHostName(); + // Mesos configuration + final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname); - // Flink configuration - final Configuration dynamicProperties = - FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES)); - LOG.debug("Mesos dynamic properties: {}", dynamicProperties); + int numberProcessors = Hardware.getNumberCPUCores(); - final Configuration config = createConfiguration(workingDir, dynamicProperties); + final ExecutorService futureExecutor = Executors.newFixedThreadPool( + numberProcessors, + new NamedThreadFactory("mesos-jobmanager-future-", "-thread-")); - // Mesos configuration - final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname); + final ExecutorService ioExecutor = Executors.newFixedThreadPool( + numberProcessors, + new NamedThreadFactory("mesos-jobmanager-io-", "-thread-")); + try { // environment values related to TM final int taskManagerContainerMemory; final int numInitialTaskManagers; @@ -380,6 +389,9 @@ public class MesosApplicationMasterRunner { } } + futureExecutor.shutdownNow(); + ioExecutor.shutdownNow(); + return INIT_ERROR_EXIT_CODE; } @@ -404,8 +416,11 @@ public class MesosApplicationMasterRunner { LOG.error("Failed to stop the artifact server", t); } - futureExecutor.shutdownNow(); - ioExecutor.shutdownNow(); + org.apache.flink.runtime.concurrent.Executors.gracefulShutdown( + AkkaUtils.getTimeout(config).toMillis(), + TimeUnit.MILLISECONDS, + futureExecutor, + ioExecutor); return 0; } http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala index 300539c..38886f8 100644 --- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala +++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala @@ -37,8 +37,9 @@ import scala.concurrent.duration._ /** JobManager actor for execution on Mesos. . * * @param flinkConfiguration Configuration object for the actor - * @param executor Execution context which is used to execute concurrent tasks in the + * @param futureExecutor Execution context which is used to execute concurrent tasks in the * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] + * @param ioExecutor for blocking io operations * @param instanceManager Instance manager to manage the registered * [[org.apache.flink.runtime.taskmanager.TaskManager]] * @param scheduler Scheduler to schedule Flink jobs @@ -48,22 +49,25 @@ import scala.concurrent.duration._ * @param timeout Timeout for futures * @param leaderElectionService LeaderElectionService to participate in the leader election */ -class MesosJobManager(flinkConfiguration: FlinkConfiguration, - executor: Executor, - instanceManager: InstanceManager, - scheduler: FlinkScheduler, - libraryCacheManager: BlobLibraryCacheManager, - archive: ActorRef, - restartStrategyFactory: RestartStrategyFactory, - timeout: FiniteDuration, - leaderElectionService: LeaderElectionService, - submittedJobGraphs : SubmittedJobGraphStore, - checkpointRecoveryFactory : CheckpointRecoveryFactory, - jobRecoveryTimeout: FiniteDuration, - metricsRegistry: Option[FlinkMetricRegistry]) +class MesosJobManager( + flinkConfiguration: FlinkConfiguration, + futureExecutor: Executor, + ioExecutor: Executor, + instanceManager: InstanceManager, + scheduler: FlinkScheduler, + libraryCacheManager: BlobLibraryCacheManager, + archive: ActorRef, + restartStrategyFactory: RestartStrategyFactory, + timeout: FiniteDuration, + leaderElectionService: LeaderElectionService, + submittedJobGraphs : SubmittedJobGraphStore, + checkpointRecoveryFactory : CheckpointRecoveryFactory, + jobRecoveryTimeout: FiniteDuration, + metricsRegistry: Option[FlinkMetricRegistry]) extends ContaineredJobManager( flinkConfiguration, - executor, + futureExecutor, + ioExecutor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java index 3702eb4..97de89b 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java @@ -165,7 +165,7 @@ public class BackPressureStatsTracker { if (!pendingStats.contains(vertex) && !vertex.getGraph().getState().isGloballyTerminalState()) { - Executor executor = vertex.getGraph().getExecutor(); + Executor executor = vertex.getGraph().getFutureExecutor(); // Only trigger if still active job if (executor != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java index 7ac2a69..c7e303d 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java @@ -66,7 +66,7 @@ public class BackPressureStatsTrackerTest { when(graph.getState()).thenReturn(JobStatus.RUNNING); // Same Thread execution context - when(graph.getExecutor()).thenReturn(new Executor() { + when(graph.getFutureExecutor()).thenReturn(new Executor() { @Override public void execute(Runnable runnable) { http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 886409d..638e0a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.TaskStateHandles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.util.Preconditions.checkArgument; @@ -150,6 +152,8 @@ public class CheckpointCoordinator { /** Default checkpoint properties **/ private final CheckpointProperties checkpointProperties; + private final Executor executor; + // -------------------------------------------------------------------------------------------- public CheckpointCoordinator( @@ -165,7 +169,8 @@ public class CheckpointCoordinator { CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, String checkpointDirectory, - CheckpointStatsTracker statsTracker) { + CheckpointStatsTracker statsTracker, + Executor executor) { // sanity checks checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero"); @@ -216,6 +221,8 @@ public class CheckpointCoordinator { } catch (Throwable t) { throw new RuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(), t); } + + this.executor = checkNotNull(executor); } // -------------------------------------------------------------------------------------------- @@ -296,7 +303,7 @@ public class CheckpointCoordinator { * the checkpoint will be declined. * @return <code>true</code> if triggering the checkpoint succeeded. */ - public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) throws Exception { + public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess(); } @@ -305,7 +312,7 @@ public class CheckpointCoordinator { long timestamp, CheckpointProperties props, String targetDirectory, - boolean isPeriodic) throws Exception { + boolean isPeriodic) { // Sanity check if (props.externalizeCheckpoint() && targetDirectory == null) { @@ -415,36 +422,32 @@ public class CheckpointCoordinator { } final PendingCheckpoint checkpoint = new PendingCheckpoint( - job, - checkpointID, - timestamp, - ackTasks, - isPeriodic, - props, - targetDirectory); + job, + checkpointID, + timestamp, + ackTasks, + isPeriodic, + props, + targetDirectory, + executor); // schedule the timer that will clean up the expired checkpoints TimerTask canceller = new TimerTask() { @Override public void run() { - try { - synchronized (lock) { - // only do the work if the checkpoint is not discarded anyways - // note that checkpoint completion discards the pending checkpoint object - if (!checkpoint.isDiscarded()) { - LOG.info("Checkpoint " + checkpointID + " expired before completing."); - - checkpoint.abortExpired(); - pendingCheckpoints.remove(checkpointID); - rememberRecentCheckpointId(checkpointID); - - triggerQueuedRequests(); - } + synchronized (lock) { + // only do the work if the checkpoint is not discarded anyways + // note that checkpoint completion discards the pending checkpoint object + if (!checkpoint.isDiscarded()) { + LOG.info("Checkpoint " + checkpointID + " expired before completing."); + + checkpoint.abortExpired(); + pendingCheckpoints.remove(checkpointID); + rememberRecentCheckpointId(checkpointID); + + triggerQueuedRequests(); } } - catch (Throwable t) { - LOG.error("Exception while handling checkpoint timeout", t); - } } }; @@ -531,7 +534,7 @@ public class CheckpointCoordinator { * * @param message Checkpoint decline from the task manager */ - public void receiveDeclineMessage(DeclineCheckpoint message) throws Exception { + public void receiveDeclineMessage(DeclineCheckpoint message) { if (shutdown || message == null) { return; } @@ -675,12 +678,8 @@ public class CheckpointCoordinator { "the state handle to avoid lingering state.", message.getCheckpointId(), message.getTaskExecutionId(), message.getJob()); - try { - message.getSubtaskState().discardState(); - } catch (Exception e) { - LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.", - message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e); - } + discardState(message.getSubtaskState()); + break; case DISCARDED: LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " + @@ -688,12 +687,7 @@ public class CheckpointCoordinator { "state handle tp avoid lingering state.", message.getCheckpointId(), message.getTaskExecutionId(), message.getJob()); - try { - message.getSubtaskState().discardState(); - } catch (Exception e) { - LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.", - message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e); - } + discardState(message.getSubtaskState()); } } else if (checkpoint != null) { @@ -712,13 +706,8 @@ public class CheckpointCoordinator { isPendingCheckpoint = false; } - try { - // try to discard the state so that we don't have lingering state lying around - message.getSubtaskState().discardState(); - } catch (Exception e) { - LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.", - message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e); - } + // try to discard the state so that we don't have lingering state lying around + discardState(message.getSubtaskState()); } } @@ -747,7 +736,7 @@ public class CheckpointCoordinator { recentPendingCheckpoints.addLast(id); } - private void dropSubsumedCheckpoints(long checkpointId) throws Exception { + private void dropSubsumedCheckpoints(long checkpointId) { Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator(); while (entries.hasNext()) { @@ -766,7 +755,7 @@ public class CheckpointCoordinator { * * <p>NOTE: The caller of this method must hold the lock when invoking the method! */ - private void triggerQueuedRequests() throws Exception { + private void triggerQueuedRequests() { if (triggerRequestQueued) { triggerRequestQueued = false; @@ -915,11 +904,7 @@ public class CheckpointCoordinator { } for (PendingCheckpoint p : pendingCheckpoints.values()) { - try { - p.abortError(new Exception("Checkpoint Coordinator is suspending.")); - } catch (Throwable t) { - LOG.error("Error while disposing pending checkpoint", t); - } + p.abortError(new Exception("Checkpoint Coordinator is suspending.")); } pendingCheckpoints.clear(); @@ -959,4 +944,17 @@ public class CheckpointCoordinator { } } } + + private void discardState(final StateObject stateObject) { + executor.execute(new Runnable() { + @Override + public void run() { + try { + stateObject.discardState(); + } catch (Exception e) { + LOG.warn("Could not properly discard state object.", e); + } + } + }); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 5034502..cfb59f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -84,6 +86,8 @@ public class PendingCheckpoint { /** The promise to fulfill once the checkpoint has been completed. */ private final FlinkCompletableFuture<CompletedCheckpoint> onCompletionPromise = new FlinkCompletableFuture<>(); + private final Executor executor; + private int numAcknowledgedTasks; private boolean discarded; @@ -97,7 +101,8 @@ public class PendingCheckpoint { Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm, boolean isPeriodic, CheckpointProperties props, - String targetDirectory) { + String targetDirectory, + Executor executor) { this.jobId = checkNotNull(jobId); this.checkpointId = checkpointId; this.checkpointTimestamp = checkpointTimestamp; @@ -106,6 +111,7 @@ public class PendingCheckpoint { this.taskStates = new HashMap<>(); this.props = checkNotNull(props); this.targetDirectory = targetDirectory; + this.executor = Preconditions.checkNotNull(executor); // Sanity check if (props.externalizeCheckpoint() && targetDirectory == null) { @@ -324,7 +330,7 @@ public class PendingCheckpoint { /** * Aborts a checkpoint because it expired (took too long). */ - public void abortExpired() throws Exception { + public void abortExpired() { try { onCompletionPromise.completeExceptionally(new Exception("Checkpoint expired before completing")); } finally { @@ -335,7 +341,7 @@ public class PendingCheckpoint { /** * Aborts the pending checkpoint because a newer completed checkpoint subsumed it. */ - public void abortSubsumed() throws Exception { + public void abortSubsumed() { try { if (props.forceCheckpoint()) { onCompletionPromise.completeExceptionally(new Exception("Bug: forced checkpoints must never be subsumed")); @@ -349,7 +355,7 @@ public class PendingCheckpoint { } } - public void abortDeclined() throws Exception { + public void abortDeclined() { try { onCompletionPromise.completeExceptionally(new Exception("Checkpoint was declined (tasks not ready)")); } finally { @@ -361,7 +367,7 @@ public class PendingCheckpoint { * Aborts the pending checkpoint due to an error. * @param cause The error's exception. */ - public void abortError(Throwable cause) throws Exception { + public void abortError(Throwable cause) { try { onCompletionPromise.completeExceptionally(new Exception("Checkpoint failed: " + cause.getMessage(), cause)); } finally { @@ -369,13 +375,24 @@ public class PendingCheckpoint { } } - private void dispose(boolean releaseState) throws Exception { + private void dispose(boolean releaseState) { synchronized (lock) { try { discarded = true; numAcknowledgedTasks = -1; if (releaseState) { - StateUtil.bestEffortDiscardAllStateObjects(taskStates.values()); + executor.execute(new Runnable() { + @Override + public void run() { + try { + StateUtil.bestEffortDiscardAllStateObjects(taskStates.values()); + } catch (Exception e) { + LOG.warn("Could not properly dispose the pending checkpoint " + + "{} of job {}.", checkpointId, jobId, e); + } + } + }); + } } finally { taskStates.clear(); http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java index 1832d70..391f233 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java @@ -18,13 +18,20 @@ package org.apache.flink.runtime.concurrent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; /** * Collection of {@link Executor} implementations */ public class Executors { + private static final Logger LOG = LoggerFactory.getLogger(Executors.class); + /** * Return a direct executor. The direct executor directly executes the runnable in the calling * thread. @@ -49,4 +56,46 @@ public class Executors { command.run(); } } + + /** + * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that + * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time, + * they will be shut down hard. + * + * @param timeout to wait for the termination of all ExecutorServices + * @param unit of the timeout + * @param executorServices to shut down + */ + public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) { + for (ExecutorService executorService: executorServices) { + executorService.shutdown(); + } + + boolean wasInterrupted = false; + final long endTime = unit.toMillis(timeout) + System.currentTimeMillis(); + long timeLeft = unit.toMillis(timeout); + boolean hasTimeLeft = timeLeft > 0L; + + for (ExecutorService executorService: executorServices) { + if (wasInterrupted || !hasTimeLeft) { + executorService.shutdownNow(); + } else { + try { + if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) { + LOG.warn("ExecutorService did not terminate in time. Shutting it down now."); + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while shutting down executor services. Shutting all " + + "remaining ExecutorServices down now.", e); + executorService.shutdownNow(); + + wasInterrupted = true; + } + + timeLeft = endTime - System.currentTimeMillis(); + hasTimeLeft = timeLeft > 0L; + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index f8e894a..cbb4c7e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -58,6 +58,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -203,7 +204,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive private CheckpointStatsTracker checkpointStatsTracker; /** The executor which is used to execute futures. */ - private Executor executor; + private final Executor futureExecutor; + + /** The executor which is used to execute blocking io operations */ + private final Executor ioExecutor; /** Registered KvState instances reported by the TaskManagers. */ private KvStateLocationRegistry kvStateLocationRegistry; @@ -219,7 +223,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive * This constructor is for tests only, because it does not include class loading information. */ ExecutionGraph( - Executor executor, + Executor futureExecutor, + Executor ioExecutor, JobID jobId, String jobName, Configuration jobConfig, @@ -227,7 +232,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive Time timeout, RestartStrategy restartStrategy) throws IOException { this( - executor, + futureExecutor, + ioExecutor, jobId, jobName, jobConfig, @@ -242,7 +248,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } public ExecutionGraph( - Executor executor, + Executor futureExecutor, + Executor ioExecutor, JobID jobId, String jobName, Configuration jobConfig, @@ -254,7 +261,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive ClassLoader userClassLoader, MetricGroup metricGroup) throws IOException { - checkNotNull(executor); + checkNotNull(futureExecutor); checkNotNull(jobId); checkNotNull(jobName); checkNotNull(jobConfig); @@ -271,7 +278,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive // serialize the job information to do the serialisation work only once this.serializedJobInformation = new SerializedValue<>(jobInformation); - this.executor = executor; + this.futureExecutor = Preconditions.checkNotNull(futureExecutor); + this.ioExecutor = Preconditions.checkNotNull(ioExecutor); this.userClassLoader = userClassLoader; @@ -365,19 +373,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive // create the coordinator that triggers and commits checkpoints and holds the state checkpointCoordinator = new CheckpointCoordinator( - jobInformation.getJobId(), - interval, - checkpointTimeout, - minPauseBetweenCheckpoints, - maxConcurrentCheckpoints, - externalizeSettings, - tasksToTrigger, - tasksToWaitFor, - tasksToCommitTo, - checkpointIDCounter, - checkpointStore, - checkpointDir, - checkpointStatsTracker); + jobInformation.getJobId(), + interval, + checkpointTimeout, + minPauseBetweenCheckpoints, + maxConcurrentCheckpoints, + externalizeSettings, + tasksToTrigger, + tasksToWaitFor, + tasksToCommitTo, + checkpointIDCounter, + checkpointStore, + checkpointDir, + checkpointStatsTracker, + ioExecutor); // interval of max long value indicates disable periodic checkpoint, // the CheckpointActivatorDeactivator should be created only if the interval is not max value @@ -589,8 +598,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive * * @return ExecutionContext associated with this ExecutionGraph */ - public Executor getExecutor() { - return executor; + public Executor getFutureExecutor() { + return futureExecutor; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index 3be1d56..a1d7385 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -53,7 +53,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * Utility class to encapsulate the logic of building an {@link ExecutionGraph} from a {@link JobGraph}. */ public class ExecutionGraphBuilder { - /** + /** * Builds the ExecutionGraph from the JobGraph. * If a prior execution graph exists, the JobGraph will be attached. If no prior execution * graph exists, then the JobGraph will become attach to a new emoty execution graph. @@ -62,7 +62,8 @@ public class ExecutionGraphBuilder { @Nullable ExecutionGraph prior, JobGraph jobGraph, Configuration jobManagerConfig, - Executor executor, + Executor futureExecutor, + Executor ioExecutor, ClassLoader classLoader, CheckpointRecoveryFactory recoveryFactory, Time timeout, @@ -83,17 +84,18 @@ public class ExecutionGraphBuilder { try { executionGraph = (prior != null) ? prior : new ExecutionGraph( - executor, - jobId, - jobName, - jobGraph.getJobConfiguration(), - jobGraph.getSerializedExecutionConfig(), - timeout, - restartStrategy, - jobGraph.getUserJarBlobKeys(), - jobGraph.getClasspaths(), - classLoader, - metrics); + futureExecutor, + ioExecutor, + jobId, + jobName, + jobGraph.getJobConfiguration(), + jobGraph.getSerializedExecutionConfig(), + timeout, + restartStrategy, + jobGraph.getUserJarBlobKeys(), + jobGraph.getClasspaths(), + classLoader, + metrics); } catch (IOException e) { throw new JobException("Could not create the execution graph.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 01e8660..39c60d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -128,7 +128,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi this.priorExecutions = new CopyOnWriteArrayList<Execution>(); this.currentExecution = new Execution( - getExecutionGraph().getExecutor(), + getExecutionGraph().getFutureExecutor(), this, 0, createTimestamp, @@ -435,7 +435,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi if (state == FINISHED || state == CANCELED || state == FAILED) { priorExecutions.add(execution); currentExecution = new Execution( - getExecutionGraph().getExecutor(), + getExecutionGraph().getFutureExecutor(), this, execution.getAttemptNumber()+1, System.currentTimeMillis(), http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java index 3962e91..10546a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java @@ -70,7 +70,7 @@ public class FailureRateRestartStrategy implements RestartStrategy { restartTimestampsDeque.remove(); } restartTimestampsDeque.add(System.currentTimeMillis()); - FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()), executionGraph.getExecutor()); + FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()), executionGraph.getFutureExecutor()); } private boolean isRestartTimestampsQueueFull() { http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java index 5337c6a..f51ea7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java @@ -58,7 +58,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy { @Override public void restart(final ExecutionGraph executionGraph) { currentRestartAttempt++; - FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getExecutor()); + FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getFutureExecutor()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala index 0f31eba..cbe80f1 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala @@ -45,8 +45,9 @@ import scala.language.postfixOps * to start/administer/stop the session. * * @param flinkConfiguration Configuration object for the actor - * @param executor Execution context which is used to execute concurrent tasks in the + * @param futureExecutor Execution context which is used to execute concurrent tasks in the * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] + * @param ioExecutor to execute blocking io operations * @param instanceManager Instance manager to manage the registered * [[org.apache.flink.runtime.taskmanager.TaskManager]] * @param scheduler Scheduler to schedule Flink jobs @@ -58,7 +59,8 @@ import scala.language.postfixOps */ abstract class ContaineredJobManager( flinkConfiguration: Configuration, - executor: Executor, + futureExecutor: Executor, + ioExecutor: Executor, instanceManager: InstanceManager, scheduler: FlinkScheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -72,7 +74,8 @@ abstract class ContaineredJobManager( metricsRegistry: Option[FlinkMetricRegistry]) extends JobManager( flinkConfiguration, - executor, + futureExecutor, + ioExecutor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 08ed0a4..197456f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -44,7 +44,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.concurrent.{AcceptFunction, BiFunction} +import org.apache.flink.runtime.concurrent.{AcceptFunction, BiFunction, Executors => FlinkExecutors} import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory @@ -118,7 +118,8 @@ import scala.language.postfixOps */ class JobManager( protected val flinkConfiguration: Configuration, - protected val executor: Executor, + protected val futureExecutor: Executor, + protected val ioExecutor: Executor, protected val instanceManager: InstanceManager, protected val scheduler: FlinkScheduler, protected val libraryCacheManager: BlobLibraryCacheManager, @@ -1247,7 +1248,8 @@ class JobManager( executionGraph, jobGraph, flinkConfiguration, - executor, + futureExecutor, + ioExecutor, userCodeLoader, checkpointRecoveryFactory, Time.of(timeout.length, timeout.unit), @@ -1976,8 +1978,9 @@ object JobManager { val ioExecutor = Executors.newFixedThreadPool( numberProcessors, - new NamedThreadFactory("jobmanager-io-", "-thread-") - ) + new NamedThreadFactory("jobmanager-io-", "-thread-")) + + val timeout = AkkaUtils.getTimeout(configuration) val (jobManagerSystem, _, _, webMonitorOption, _) = try { startActorSystemAndJobManagerActors( @@ -1993,7 +1996,8 @@ object JobManager { ) } catch { case t: Throwable => - futureExecutor.shutdownNow() + futureExecutor.shutdownNow() + ioExecutor.shutdownNow() throw t } @@ -2011,8 +2015,11 @@ object JobManager { } } - futureExecutor.shutdownNow() - ioExecutor.shutdownNow() + FlinkExecutors.gracefulShutdown( + timeout.toMillis, + TimeUnit.MILLISECONDS, + futureExecutor, + ioExecutor) } /** @@ -2620,6 +2627,7 @@ object JobManager { jobManagerClass, configuration, futureExecutor, + ioExecutor, instanceManager, scheduler, libraryCacheManager, @@ -2653,7 +2661,8 @@ object JobManager { def getJobManagerProps( jobManagerClass: Class[_ <: JobManager], configuration: Configuration, - executor: Executor, + futureExecutor: Executor, + ioExecutor: Executor, instanceManager: InstanceManager, scheduler: FlinkScheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -2669,7 +2678,8 @@ object JobManager { Props( jobManagerClass, configuration, - executor, + futureExecutor, + ioExecutor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 4367442..dc59048 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -20,16 +20,18 @@ package org.apache.flink.runtime.minicluster import java.net.InetAddress import java.util.UUID -import java.util.concurrent.{Executors, ForkJoinPool} +import java.util.concurrent.{Executors, TimeUnit} import akka.pattern.Patterns.gracefulStop import akka.pattern.ask import akka.actor.{ActorRef, ActorSystem} import com.typesafe.config.Config +import org.apache.flink.api.common.time.Time import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult} import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.client.{JobClient, JobExecutionException} +import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors} import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway} import org.apache.flink.runtime.jobgraph.JobGraph import org.apache.flink.runtime.jobmanager.HighAvailabilityMode @@ -409,8 +411,11 @@ abstract class FlinkMiniCluster( jobManagerLeaderRetrievalService.foreach(_.stop()) isRunning = false - futureExecutor.shutdownNow() - ioExecutor.shutdownNow() + FlinkExecutors.gracefulShutdown( + timeout.toMillis, + TimeUnit.MILLISECONDS, + futureExecutor, + ioExecutor) } protected def shutdown(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index b2aedf7..09deadc 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -18,12 +18,12 @@ package org.apache.flink.runtime.minicluster -import java.util.concurrent.ExecutorService +import java.util.concurrent.{Executor, ExecutorService} import akka.actor.{ActorRef, ActorSystem, Props} import org.apache.flink.api.common.JobID import org.apache.flink.api.common.io.FileOutputFormat -import org.apache.flink.configuration.{QueryableStateOptions, ConfigConstants, Configuration} +import org.apache.flink.configuration.{ConfigConstants, Configuration, QueryableStateOptions} import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager @@ -141,6 +141,7 @@ class LocalFlinkMiniCluster( jobManagerClass, config, futureExecutor, + ioExecutor, instanceManager, scheduler, libraryCacheManager, @@ -242,25 +243,28 @@ class LocalFlinkMiniCluster( } def getJobManagerProps( - jobManagerClass: Class[_ <: JobManager], - configuration: Configuration, - executorService: ExecutorService, - instanceManager: InstanceManager, - scheduler: Scheduler, - libraryCacheManager: BlobLibraryCacheManager, - archive: ActorRef, - restartStrategyFactory: RestartStrategyFactory, - timeout: FiniteDuration, - leaderElectionService: LeaderElectionService, - submittedJobGraphStore: SubmittedJobGraphStore, - checkpointRecoveryFactory: CheckpointRecoveryFactory, - jobRecoveryTimeout: FiniteDuration, - metricsRegistry: Option[MetricRegistry]): Props = { + jobManagerClass: Class[_ <: JobManager], + configuration: Configuration, + futureExecutor: Executor, + ioExecutor: Executor, + instanceManager: InstanceManager, + scheduler: Scheduler, + libraryCacheManager: BlobLibraryCacheManager, + archive: ActorRef, + restartStrategyFactory: RestartStrategyFactory, + timeout: FiniteDuration, + leaderElectionService: LeaderElectionService, + submittedJobGraphStore: SubmittedJobGraphStore, + checkpointRecoveryFactory: CheckpointRecoveryFactory, + jobRecoveryTimeout: FiniteDuration, + metricsRegistry: Option[MetricRegistry]) + : Props = { JobManager.getJobManagerProps( jobManagerClass, configuration, - executorService, + futureExecutor, + ioExecutor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index a59ffa2..8e46f4c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; @@ -116,19 +117,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, - new ExecutionVertex[] { ackVertex1, ackVertex2 }, - new ExecutionVertex[] {}, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, + new ExecutionVertex[] { ackVertex1, ackVertex2 }, + new ExecutionVertex[] {}, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -169,19 +171,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, - new ExecutionVertex[] { ackVertex1, ackVertex2 }, - new ExecutionVertex[] {}, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, + new ExecutionVertex[] { ackVertex1, ackVertex2 }, + new ExecutionVertex[] {}, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -220,19 +223,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, - new ExecutionVertex[] { ackVertex1, ackVertex2 }, - new ExecutionVertex[] {}, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, + new ExecutionVertex[] { ackVertex1, ackVertex2 }, + new ExecutionVertex[] {}, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -272,19 +276,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { vertex1, vertex2 }, - new ExecutionVertex[] { vertex1, vertex2 }, - new ExecutionVertex[] { vertex1, vertex2 }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -370,19 +375,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { vertex1, vertex2 }, - new ExecutionVertex[] { vertex1, vertex2 }, - new ExecutionVertex[] { vertex1, vertex2 }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -487,19 +493,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { vertex1, vertex2 }, - new ExecutionVertex[] { vertex1, vertex2 }, - new ExecutionVertex[] { vertex1, vertex2 }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -633,19 +640,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, - new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 }, - new ExecutionVertex[] { commitVertex }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2), - null, - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, + new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 }, + new ExecutionVertex[] { commitVertex }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -767,19 +775,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, - new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 }, - new ExecutionVertex[] { commitVertex }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(10), - null, - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, + new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 }, + new ExecutionVertex[] { commitVertex }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(10), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -888,19 +897,20 @@ public class CheckpointCoordinatorTest { // the timeout for the checkpoint is a 200 milliseconds CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 200, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { triggerVertex }, - new ExecutionVertex[] { ackVertex1, ackVertex2 }, - new ExecutionVertex[] { commitVertex }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2), - null, - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 200, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { triggerVertex }, + new ExecutionVertex[] { ackVertex1, ackVertex2 }, + new ExecutionVertex[] { commitVertex }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); // trigger a checkpoint, partially acknowledged assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -956,19 +966,20 @@ public class CheckpointCoordinatorTest { ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID); CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 200000, - 200000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { triggerVertex }, - new ExecutionVertex[] { ackVertex1, ackVertex2 }, - new ExecutionVertex[] { commitVertex }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2), - null, - new DisabledCheckpointStatsTracker()); + jid, + 200000, + 200000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { triggerVertex }, + new ExecutionVertex[] { ackVertex1, ackVertex2 }, + new ExecutionVertex[] { commitVertex }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -1033,7 +1044,8 @@ public class CheckpointCoordinatorTest { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - new DisabledCheckpointStatsTracker()); + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -1145,19 +1157,20 @@ public class CheckpointCoordinatorTest { }).when(execution).triggerCheckpoint(anyLong(), anyLong()); CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 10, // periodic interval is 10 ms - 200000, // timeout is very long (200 s) - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { triggerVertex }, - new ExecutionVertex[] { ackVertex }, - new ExecutionVertex[] { commitVertex }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2), - null, - new DisabledCheckpointStatsTracker()); + jid, + 10, // periodic interval is 10 ms + 200000, // timeout is very long (200 s) + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { triggerVertex }, + new ExecutionVertex[] { ackVertex }, + new ExecutionVertex[] { commitVertex }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); coord.startCheckpointScheduler(); @@ -1236,19 +1249,20 @@ public class CheckpointCoordinatorTest { }).when(execution).triggerCheckpoint(anyLong(), anyLong()); CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 10, // periodic interval is 10 ms - 200000, // timeout is very long (200 s) - 500, // 500ms delay between checkpoints - 10, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { vertex1 }, - new ExecutionVertex[] { vertex1 }, - new ExecutionVertex[] { vertex1 }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2), - null, - new DisabledCheckpointStatsTracker()); + jid, + 10, // periodic interval is 10 ms + 200000, // timeout is very long (200 s) + 500, // 500ms delay between checkpoints + 10, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); coord.startCheckpointScheduler(); @@ -1321,19 +1335,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { vertex1, vertex2 }, - new ExecutionVertex[] { vertex1, vertex2 }, - new ExecutionVertex[] { vertex1, vertex2 }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -1456,19 +1471,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { vertex1, vertex2 }, - new ExecutionVertex[] { vertex1, vertex2 }, - new ExecutionVertex[] { vertex1, vertex2 }, - counter, - new StandaloneCompletedCheckpointStore(10), - null, - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + counter, + new StandaloneCompletedCheckpointStore(10), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); String savepointDir = tmpFolder.newFolder().getAbsolutePath(); @@ -1559,19 +1575,20 @@ public class CheckpointCoordinatorTest { }).when(execution).notifyCheckpointComplete(anyLong(), anyLong()); CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 10, // periodic interval is 10 ms - 200000, // timeout is very long (200 s) - 0L, // no extra delay - maxConcurrentAttempts, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { triggerVertex }, - new ExecutionVertex[] { ackVertex }, - new ExecutionVertex[] { commitVertex }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2), - null, - new DisabledCheckpointStatsTracker()); + jid, + 10, // periodic interval is 10 ms + 200000, // timeout is very long (200 s) + 0L, // no extra delay + maxConcurrentAttempts, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { triggerVertex }, + new ExecutionVertex[] { ackVertex }, + new ExecutionVertex[] { commitVertex }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); coord.startCheckpointScheduler(); @@ -1632,19 +1649,20 @@ public class CheckpointCoordinatorTest { ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID); CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 10, // periodic interval is 10 ms - 200000, // timeout is very long (200 s) - 0L, // no extra delay - maxConcurrentAttempts, // max two concurrent checkpoints - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { triggerVertex }, - new ExecutionVertex[] { ackVertex }, - new ExecutionVertex[] { commitVertex }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2), - null, - new DisabledCheckpointStatsTracker()); + jid, + 10, // periodic interval is 10 ms + 200000, // timeout is very long (200 s) + 0L, // no extra delay + maxConcurrentAttempts, // max two concurrent checkpoints + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { triggerVertex }, + new ExecutionVertex[] { ackVertex }, + new ExecutionVertex[] { commitVertex }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); coord.startCheckpointScheduler(); @@ -1714,19 +1732,20 @@ public class CheckpointCoordinatorTest { }); CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 10, // periodic interval is 10 ms - 200000, // timeout is very long (200 s) - 0L, // no extra delay - 2, // max two concurrent checkpoints - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { triggerVertex }, - new ExecutionVertex[] { ackVertex }, - new ExecutionVertex[] { commitVertex }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2), - null, - new DisabledCheckpointStatsTracker()); + jid, + 10, // periodic interval is 10 ms + 200000, // timeout is very long (200 s) + 0L, // no extra delay + 2, // max two concurrent checkpoints + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { triggerVertex }, + new ExecutionVertex[] { ackVertex }, + new ExecutionVertex[] { commitVertex }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); coord.startCheckpointScheduler(); @@ -1766,19 +1785,20 @@ public class CheckpointCoordinatorTest { StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter(); CheckpointCoordinator coord = new CheckpointCoordinator( - jobId, - 100000, - 200000, - 0L, - 1, // max one checkpoint at a time => should not affect savepoints - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { vertex1 }, - new ExecutionVertex[] { vertex1 }, - new ExecutionVertex[] { vertex1 }, - checkpointIDCounter, - new StandaloneCompletedCheckpointStore(2), - null, - new DisabledCheckpointStatsTracker()); + jobId, + 100000, + 200000, + 0L, + 1, // max one checkpoint at a time => should not affect savepoints + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + checkpointIDCounter, + new StandaloneCompletedCheckpointStore(2), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); List<Future<CompletedCheckpoint>> savepointFutures = new ArrayList<>(); @@ -1819,19 +1839,20 @@ public class CheckpointCoordinatorTest { ExecutionVertex vertex1 = mockExecutionVertex(attemptID1); CheckpointCoordinator coord = new CheckpointCoordinator( - jobId, - 100000, - 200000, - 100000000L, // very long min delay => should not affect savepoints - 1, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { vertex1 }, - new ExecutionVertex[] { vertex1 }, - new ExecutionVertex[] { vertex1 }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2), - null, - new DisabledCheckpointStatsTracker()); + jobId, + 100000, + 200000, + 100000000L, // very long min delay => should not affect savepoints + 1, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); String savepointDir = tmpFolder.newFolder().getAbsolutePath(); @@ -1879,19 +1900,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - arrayExecutionVertices, - arrayExecutionVertices, - arrayExecutionVertices, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + arrayExecutionVertices, + arrayExecutionVertices, + arrayExecutionVertices, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -1984,19 +2006,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - arrayExecutionVertices, - arrayExecutionVertices, - arrayExecutionVertices, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + arrayExecutionVertices, + arrayExecutionVertices, + arrayExecutionVertices, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2099,19 +2122,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - arrayExecutionVertices, - arrayExecutionVertices, - arrayExecutionVertices, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + arrayExecutionVertices, + arrayExecutionVertices, + arrayExecutionVertices, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2234,19 +2258,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - arrayExecutionVertices, - arrayExecutionVertices, - arrayExecutionVertices, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + arrayExecutionVertices, + arrayExecutionVertices, + arrayExecutionVertices, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2365,19 +2390,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.externalizeCheckpoints(true), - new ExecutionVertex[] { vertex1 }, - new ExecutionVertex[] { vertex1 }, - new ExecutionVertex[] { vertex1 }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - "fake-directory", - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.externalizeCheckpoints(true), + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + "fake-directory", + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -2741,19 +2767,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - new JobID(), - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { vertex1 }, - new ExecutionVertex[] { vertex1 }, - new ExecutionVertex[] { vertex1 }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - new DisabledCheckpointStatsTracker()); + new JobID(), + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); // Periodic CheckpointTriggerResult triggerResult = coord.triggerCheckpoint( http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 6e5279b..7cea130 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -40,7 +41,6 @@ import org.hamcrest.Description; import org.junit.Test; import org.mockito.Mockito; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -97,19 +97,20 @@ public class CheckpointStateRestoreTest { map.put(statelessId, stateless); CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 200000L, - 200000L, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, - new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, - new ExecutionVertex[0], - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - new DisabledCheckpointStatsTracker()); + jid, + 200000L, + 200000L, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, + new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, + new ExecutionVertex[0], + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); // create ourselves a checkpoint with state final long timestamp = 34623786L; @@ -172,19 +173,20 @@ public class CheckpointStateRestoreTest { public void testNoCheckpointAvailable() { try { CheckpointCoordinator coord = new CheckpointCoordinator( - new JobID(), - 200000L, - 200000L, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { mock(ExecutionVertex.class) }, - new ExecutionVertex[] { mock(ExecutionVertex.class) }, - new ExecutionVertex[0], - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - new DisabledCheckpointStatsTracker()); + new JobID(), + 200000L, + 200000L, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { mock(ExecutionVertex.class) }, + new ExecutionVertex[] { mock(ExecutionVertex.class) }, + new ExecutionVertex[0], + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); try { coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false); @@ -227,19 +229,20 @@ public class CheckpointStateRestoreTest { tasks.put(jobVertexId2, jobVertex2); CheckpointCoordinator coord = new CheckpointCoordinator( - new JobID(), - Integer.MAX_VALUE, - Integer.MAX_VALUE, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] {}, - new ExecutionVertex[] {}, - new ExecutionVertex[] {}, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - new DisabledCheckpointStatsTracker()); + new JobID(), + Integer.MAX_VALUE, + Integer.MAX_VALUE, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] {}, + new ExecutionVertex[] {}, + new ExecutionVertex[] {}, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); ChainedStateHandle<StreamStateHandle> serializedState = CheckpointCoordinatorTest .generateChainedStateHandle(new SerializableObject()); http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index c8c9350..c6c7ae5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -97,6 +97,7 @@ public class ExecutionGraphCheckpointCoordinatorTest { CompletedCheckpointStore store) throws Exception { ExecutionGraph executionGraph = new ExecutionGraph( TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), new JobID(), "test", new Configuration(), http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 84f0e1f..e918965 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; @@ -210,7 +211,15 @@ public class PendingCheckpointTest { private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) { Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS); - return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, false, props, targetDirectory); + return new PendingCheckpoint( + new JobID(), + 0, + 1, + ackTasks, + false, + props, + targetDirectory, + Executors.directExecutor()); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java index 34043eb..0223a2e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java @@ -50,7 +50,7 @@ public class AllVerticesIteratorTest { v4.setParallelism(2); ExecutionGraph eg = Mockito.mock(ExecutionGraph.class); - Mockito.when(eg.getExecutor()).thenReturn(TestingUtils.directExecutionContext()); + Mockito.when(eg.getFutureExecutor()).thenReturn(TestingUtils.directExecutionContext()); ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, v1, 1, AkkaUtils.getDefaultTimeout());
