http://git-wip-us.apache.org/repos/asf/flink/blob/0615b62f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 3b8fc97..e11f3a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -18,38 +18,32 @@ package org.apache.flink.runtime.jobmaster; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; -import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; -import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; -import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; -import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.client.SerializedJobExecutionResult; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.BiFunction; +import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; -import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -59,18 +53,13 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; -import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse; -import org.apache.flink.runtime.jobmaster.message.NextInputSplit; -import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; @@ -83,25 +72,27 @@ import org.apache.flink.runtime.registration.RegisteredRpcConnection; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.registration.RetryingRegistration; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.StartStoppable; +import org.apache.flink.runtime.state.CheckpointStateHandles; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.SerializedValue; -import org.slf4j.Logger; -import scala.concurrent.ExecutionContext$; -import scala.concurrent.duration.FiniteDuration; +import org.slf4j.Logger; +import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -112,16 +103,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * It offers the following methods as part of its rpc interface to interact with the JobMaster * remotely: * <ul> - * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for + * <li>{@link #updateTaskExecutionState} updates the task execution state for * given task</li> * </ul> */ public class JobMaster extends RpcEndpoint<JobMasterGateway> { + private static final AtomicReferenceFieldUpdater<JobMaster, UUID> LEADER_ID_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(JobMaster.class, UUID.class, "leaderSessionID"); + + // ------------------------------------------------------------------------ + /** Logical representation of the job */ private final JobGraph jobGraph; - /** Configuration of the job */ + /** Configuration of the JobManager */ private final Configuration configuration; /** Service to contend for and retrieve the leadership of JM and RM */ @@ -130,37 +126,24 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { /** Blob cache manager used across jobs */ private final BlobLibraryCacheManager libraryCacheManager; - /** Factory to create restart strategy for this job */ - private final RestartStrategyFactory restartStrategyFactory; - - /** Store for save points */ - private final SavepointStore savepointStore; - - /** The timeout for this job */ - private final Time timeout; - - /** The scheduler to use for scheduling new tasks as they are needed */ - private final Scheduler scheduler; + /** The metrics for the JobManager itself */ + private final MetricGroup jobManagerMetricGroup; - /** The metrics group used across jobs */ - private final JobManagerMetricGroup jobManagerMetricGroup; + /** The metrics for the job */ + private final MetricGroup jobMetricGroup; /** The execution context which is used to execute futures */ - private final Executor executionContext; + private final ExecutorService executionContext; private final OnCompletionActions jobCompletionActions; - /** The execution graph of this job */ - private volatile ExecutionGraph executionGraph; - - /** The checkpoint recovery factory used by this job */ - private CheckpointRecoveryFactory checkpointRecoveryFactory; + private final FatalErrorHandler errorHandler; - private ClassLoader userCodeLoader; + private final ClassLoader userCodeLoader; - private RestartStrategy restartStrategy; + /** The execution graph of this job */ + private final ExecutionGraph executionGraph; - private MetricGroup jobMetrics; private volatile UUID leaderSessionID; @@ -170,22 +153,26 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { private LeaderRetrievalService resourceManagerLeaderRetriever; /** Connection with ResourceManager, null if not located address yet or we close it initiative */ - private volatile ResourceManagerConnection resourceManagerConnection; + private ResourceManagerConnection resourceManagerConnection; + + // TODO - we need to replace this with the slot pool + private final Scheduler scheduler; // ------------------------------------------------------------------------ public JobMaster( - JobGraph jobGraph, - Configuration configuration, - RpcService rpcService, - HighAvailabilityServices highAvailabilityService, - BlobLibraryCacheManager libraryCacheManager, - RestartStrategyFactory restartStrategyFactory, - SavepointStore savepointStore, - Time timeout, - Scheduler scheduler, - JobManagerMetricGroup jobManagerMetricGroup, - OnCompletionActions jobCompletionActions) + JobGraph jobGraph, + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityService, + ExecutorService executorService, + BlobLibraryCacheManager libraryCacheManager, + RestartStrategyFactory restartStrategyFactory, + Time rpcAskTimeout, + @Nullable JobManagerMetricGroup jobManagerMetricGroup, + OnCompletionActions jobCompletionActions, + FatalErrorHandler errorHandler, + ClassLoader userCodeLoader) throws Exception { super(rpcService); @@ -193,289 +180,149 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { this.configuration = checkNotNull(configuration); this.highAvailabilityServices = checkNotNull(highAvailabilityService); this.libraryCacheManager = checkNotNull(libraryCacheManager); - this.restartStrategyFactory = checkNotNull(restartStrategyFactory); - this.savepointStore = checkNotNull(savepointStore); - this.timeout = checkNotNull(timeout); - this.scheduler = checkNotNull(scheduler); - this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup); - this.executionContext = checkNotNull(rpcService.getExecutor()); + this.executionContext = checkNotNull(executorService); this.jobCompletionActions = checkNotNull(jobCompletionActions); - } + this.errorHandler = checkNotNull(errorHandler); + this.userCodeLoader = checkNotNull(userCodeLoader); - //---------------------------------------------------------------------------------------------- - // Lifecycle management - //---------------------------------------------------------------------------------------------- - - /** - * Initializing the job execution environment, should be called before start. Any error occurred during - * initialization will be treated as job submission failure. - * - * @throws JobSubmissionException - */ - public void init() throws JobSubmissionException { - log.info("Initializing job {} ({}).", jobGraph.getName(), jobGraph.getJobID()); + final String jobName = jobGraph.getName(); + final JobID jid = jobGraph.getJobID(); - try { - // IMPORTANT: We need to make sure that the library registration is the first action, - // because this makes sure that the uploaded jar files are removed in case of - // unsuccessful - try { - libraryCacheManager.registerJob(jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), - jobGraph.getClasspaths()); - } catch (Throwable t) { - throw new JobSubmissionException(jobGraph.getJobID(), - "Cannot set up the user code libraries: " + t.getMessage(), t); - } + if (jobManagerMetricGroup != null) { + this.jobManagerMetricGroup = jobManagerMetricGroup; + this.jobMetricGroup = jobManagerMetricGroup.addJob(jobGraph); + } else { + this.jobManagerMetricGroup = new UnregisteredMetricsGroup(); + this.jobMetricGroup = new UnregisteredMetricsGroup(); + } - userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID()); - if (userCodeLoader == null) { - throw new JobSubmissionException(jobGraph.getJobID(), - "The user code class loader could not be initialized."); - } + log.info("Initializing job {} ({}).", jobName, jid); - if (jobGraph.getNumberOfVertices() == 0) { - throw new JobSubmissionException(jobGraph.getJobID(), "The given job is empty"); - } - - final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = + final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = jobGraph.getSerializedExecutionConfig() - .deserializeValue(userCodeLoader) - .getRestartStrategy(); - if (restartStrategyConfiguration != null) { - restartStrategy = RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration); - } - else { - restartStrategy = restartStrategyFactory.createRestartStrategy(); - } + .deserializeValue(userCodeLoader) + .getRestartStrategy(); - log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobGraph.getName(), jobGraph.getJobID()); + final RestartStrategy restartStrategy = (restartStrategyConfiguration != null) ? + RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) : + restartStrategyFactory.createRestartStrategy(); - if (jobManagerMetricGroup != null) { - jobMetrics = jobManagerMetricGroup.addJob(jobGraph); - } - if (jobMetrics == null) { - jobMetrics = new UnregisteredMetricsGroup(); - } + log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid); - try { - checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory(); - } catch (Exception e) { - log.error("Could not get the checkpoint recovery factory.", e); - throw new JobSubmissionException(jobGraph.getJobID(), "Could not get the checkpoint recovery factory.", e); - } + CheckpointRecoveryFactory checkpointRecoveryFactory; + try { + checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory(); + } catch (Exception e) { + log.error("Could not create the access to highly-available checkpoint storage.", e); + throw new Exception("Could not create the access to highly-available checkpoint storage.", e); + } - try { - resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); - } catch (Exception e) { - log.error("Could not get the resource manager leader retriever.", e); - throw new JobSubmissionException(jobGraph.getJobID(), + try { + resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); + } catch (Exception e) { + log.error("Could not get the resource manager leader retriever.", e); + throw new JobSubmissionException(jobGraph.getJobID(), "Could not get the resource manager leader retriever.", e); - } - } catch (Throwable t) { - log.error("Failed to initializing job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t); + } - libraryCacheManager.unregisterJob(jobGraph.getJobID()); + this.executionGraph = ExecutionGraphBuilder.buildGraph( + null, + jobGraph, + configuration, + executorService, + userCodeLoader, + checkpointRecoveryFactory, + rpcAskTimeout, + restartStrategy, + jobMetricGroup, + -1, + log); - if (t instanceof JobSubmissionException) { - throw (JobSubmissionException) t; - } - else { - throw new JobSubmissionException(jobGraph.getJobID(), "Failed to initialize job " + - jobGraph.getName() + " (" + jobGraph.getJobID() + ")", t); - } - } + // TODO - temp fix + this.scheduler = new Scheduler(executorService); } + //---------------------------------------------------------------------------------------------- + // Lifecycle management + //---------------------------------------------------------------------------------------------- + + @Override public void start() { - super.start(); + throw new UnsupportedOperationException("Should never call start() without leader ID"); } + /** + * Start the rpc service and begin to run the job. + * + * @param leaderSessionID The necessary leader id for running the job. + */ + public void start(final UUID leaderSessionID) { + if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) { + super.start(); + + log.info("Starting JobManager for job {} ({})", jobGraph.getName(), jobGraph.getJobID()); + getSelf().startJobExecution(); + } else { + log.warn("Job already started with leaderId {}, ignoring this start request.", leaderSessionID); + } + } + + /** + * Suspend the job and shutdown all other services including rpc. + */ @Override public void shutDown() { + // make sure there is a graceful exit + getSelf().suspendExecution(new Exception("JobManager is shutting down.")); super.shutDown(); - - suspendJob(new Exception("JobManager is shutting down.")); - - disposeCommunicationWithResourceManager(); } - - //---------------------------------------------------------------------------------------------- // RPC methods //---------------------------------------------------------------------------------------------- - /** - * Start to run the job, runtime data structures like ExecutionGraph will be constructed now and checkpoint - * being recovered. After this, we will begin to schedule the job. - */ + //-- job starting and stopping ----------------------------------------------------------------- + @RpcMethod - public void startJob(final UUID leaderSessionID) { - log.info("Starting job {} ({}) with leaderId {}.", jobGraph.getName(), jobGraph.getJobID(), leaderSessionID); - - this.leaderSessionID = leaderSessionID; - - if (executionGraph != null) { - executionGraph = new ExecutionGraph( - ExecutionContext$.MODULE$.fromExecutor(executionContext), - jobGraph.getJobID(), - jobGraph.getName(), - jobGraph.getJobConfiguration(), - jobGraph.getSerializedExecutionConfig(), - new FiniteDuration(timeout.getSize(), timeout.getUnit()), - restartStrategy, - jobGraph.getUserJarBlobKeys(), - jobGraph.getClasspaths(), - userCodeLoader, - jobMetrics); - } - else { - // TODO: update last active time in JobInfo - } + public void startJobExecution() { + log.info("Starting execution of job {} ({}) with leaderId {}.", + jobGraph.getName(), jobGraph.getJobID(), leaderSessionID); try { - executionGraph.setScheduleMode(jobGraph.getScheduleMode()); - executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling()); - - try { - executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph)); - } catch (Exception e) { - log.warn("Cannot create JSON plan for job {} ({})", jobGraph.getJobID(), jobGraph.getName(), e); - executionGraph.setJsonPlan("{}"); - } - - // initialize the vertices that have a master initialization hook - // file output formats create directories here, input formats create splits - if (log.isDebugEnabled()) { - log.debug("Running initialization on master for job {} ({}).", jobGraph.getJobID(), jobGraph.getName()); - } - for (JobVertex vertex : jobGraph.getVertices()) { - final String executableClass = vertex.getInvokableClassName(); - if (executableClass == null || executableClass.length() == 0) { - throw new JobExecutionException(jobGraph.getJobID(), - "The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class."); - } - if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) { - vertex.setParallelism(scheduler.getTotalNumberOfSlots()); - } - - try { - vertex.initializeOnMaster(userCodeLoader); - } catch (Throwable t) { - throw new JobExecutionException(jobGraph.getJobID(), - "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t); - } - } - - // topologically sort the job vertices and attach the graph to the existing one - final List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources(); - if (log.isDebugEnabled()) { - log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), - jobGraph.getJobID(), jobGraph.getName()); - } - executionGraph.attachJobGraph(sortedTopology); - - if (log.isDebugEnabled()) { - log.debug("Successfully created execution graph from job graph {} ({}).", - jobGraph.getJobID(), jobGraph.getName()); - } - - final JobSnapshottingSettings snapshotSettings = jobGraph.getSnapshotSettings(); - if (snapshotSettings != null) { - List<ExecutionJobVertex> triggerVertices = getExecutionJobVertexWithId( - executionGraph, snapshotSettings.getVerticesToTrigger()); - - List<ExecutionJobVertex> ackVertices = getExecutionJobVertexWithId( - executionGraph, snapshotSettings.getVerticesToAcknowledge()); - - List<ExecutionJobVertex> confirmVertices = getExecutionJobVertexWithId( - executionGraph, snapshotSettings.getVerticesToConfirm()); - - CompletedCheckpointStore completedCheckpoints = checkpointRecoveryFactory.createCheckpointStore( - jobGraph.getJobID(), userCodeLoader); - - CheckpointIDCounter checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter( - jobGraph.getJobID()); - - // Checkpoint stats tracker - boolean isStatsDisabled = configuration.getBoolean( - ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE); - - final CheckpointStatsTracker checkpointStatsTracker; - if (isStatsDisabled) { - checkpointStatsTracker = new DisabledCheckpointStatsTracker(); - } - else { - int historySize = configuration.getInteger( - ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE); - checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics); - } - - executionGraph.enableSnapshotCheckpointing( - snapshotSettings.getCheckpointInterval(), - snapshotSettings.getCheckpointTimeout(), - snapshotSettings.getMinPauseBetweenCheckpoints(), - snapshotSettings.getMaxConcurrentCheckpoints(), - triggerVertices, - ackVertices, - confirmVertices, - checkpointIdCounter, - completedCheckpoints, - savepointStore, - checkpointStatsTracker); - } - - // TODO: register this class to execution graph as job status change listeners - - // TODO: register client as job / execution status change listeners if they are interested - - /* - TODO: decide whether we should take the savepoint before recovery - - if (isRecovery) { - // this is a recovery of a master failure (this master takes over) - executionGraph.restoreLatestCheckpointedState(); - } else { - if (snapshotSettings != null) { - String savepointPath = snapshotSettings.getSavepointPath(); - if (savepointPath != null) { - // got a savepoint - log.info("Starting job from savepoint {}.", savepointPath); - - // load the savepoint as a checkpoint into the system - final CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint( - jobGraph.getJobID(), executionGraph.getAllVertices(), savepointStore, savepointPath); - executionGraph.getCheckpointCoordinator().getCheckpointStore().addCheckpoint(savepoint); - - // Reset the checkpoint ID counter - long nextCheckpointId = savepoint.getCheckpointID() + 1; - log.info("Reset the checkpoint ID to " + nextCheckpointId); - executionGraph.getCheckpointCoordinator().getCheckpointIdCounter().setCount(nextCheckpointId); - - executionGraph.restoreLatestCheckpointedState(); - } + // register self as job status change listener + executionGraph.registerJobStatusListener(new JobStatusListener() { + @Override + public void jobStatusChanges( + final JobID jobId, final JobStatus newJobStatus, final long timestamp, final Throwable error) + { + // run in rpc thread to avoid concurrency + runAsync(new Runnable() { + @Override + public void run() { + jobStatusChanged(newJobStatus, timestamp, error); + } + }); } - } - */ + }); - // job is good to go, try to locate resource manager's address + // job is ready to go, try to establish connection with resource manager resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); } catch (Throwable t) { + + // TODO - this should not result in a job failure, but another leader should take over + // TODO - either this master should retry the execution, or it should relinquish leadership / terminate + log.error("Failed to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t); executionGraph.fail(t); - executionGraph = null; - final Throwable rt; + final JobExecutionException rt; if (t instanceof JobExecutionException) { rt = (JobExecutionException) t; - } - else { + } else { rt = new JobExecutionException(jobGraph.getJobID(), - "Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t); + "Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t); } // TODO: notify client about this failure @@ -488,34 +335,51 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { executionContext.execute(new Runnable() { @Override public void run() { - if (executionGraph != null) { - try { - executionGraph.scheduleForExecution(scheduler); - } catch (Throwable t) { - executionGraph.fail(t); - } + try { + executionGraph.scheduleForExecution(scheduler); + } catch (Throwable t) { + executionGraph.fail(t); } } }); } /** - * Suspending job, all the running tasks will be cancelled, and runtime data will be cleared. + * Suspending job, all the running tasks will be cancelled, and communication with other components + * will be disposed. + * + * <p>Mostly job is suspended because of the leadership has been revoked, one can be restart this job by + * calling the {@link #start(UUID)} method once we take the leadership back again. * * @param cause The reason of why this job been suspended. */ @RpcMethod - public void suspendJob(final Throwable cause) { + public void suspendExecution(final Throwable cause) { + if (leaderSessionID == null) { + log.debug("Job has already been suspended or shutdown."); + return; + } + + // receive no more messages until started again, should be called before we clear self leader id + ((StartStoppable) getSelf()).stop(); + leaderSessionID = null; + executionGraph.suspend(cause); - if (executionGraph != null) { - executionGraph.suspend(cause); - executionGraph = null; + // disconnect from resource manager: + try { + resourceManagerLeaderRetriever.stop(); + } catch (Exception e) { + log.warn("Failed to stop resource manager leader retriever when suspending."); } + closeResourceManagerConnection(); + + // TODO: disconnect from all registered task managers - disposeCommunicationWithResourceManager(); } + //---------------------------------------------------------------------------------------------- + /** * Updates the task execution state for a given task. * @@ -523,24 +387,38 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { * @return Acknowledge the task execution state update */ @RpcMethod - public Acknowledge updateTaskExecutionState(final TaskExecutionState taskExecutionState) throws ExecutionGraphException { + public Acknowledge updateTaskExecutionState( + final UUID leaderSessionID, + final TaskExecutionState taskExecutionState) throws Exception + { if (taskExecutionState == null) { throw new NullPointerException("TaskExecutionState must not be null."); } + if (!this.leaderSessionID.equals(leaderSessionID)) { + throw new Exception("Leader id not match, expected: " + this.leaderSessionID + + ", actual: " + leaderSessionID); + } + if (executionGraph.updateState(taskExecutionState)) { return Acknowledge.get(); } else { throw new ExecutionGraphException("The execution attempt " + - taskExecutionState.getID() + " was not found."); + taskExecutionState.getID() + " was not found."); } } @RpcMethod public SerializedInputSplit requestNextInputSplit( - final JobVertexID vertexID, - final ExecutionAttemptID executionAttempt) throws Exception + final UUID leaderSessionID, + final JobVertexID vertexID, + final ExecutionAttemptID executionAttempt) throws Exception { + if (!this.leaderSessionID.equals(leaderSessionID)) { + throw new Exception("Leader id not match, expected: " + this.leaderSessionID + + ", actual: " + leaderSessionID); + } + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); if (execution == null) { // can happen when JobManager had already unregistered this execution upon on task failure, @@ -579,7 +457,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } catch (Exception ex) { log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); IOException reason = new IOException("Could not serialize the next input split of class " + - nextInputSplit.getClass() + ".", ex); + nextInputSplit.getClass() + ".", ex); vertex.fail(reason); throw reason; } @@ -587,17 +465,31 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { @RpcMethod public PartitionState requestPartitionState( - final ResultPartitionID partitionId, - final ExecutionAttemptID taskExecutionId, - final IntermediateDataSetID taskResultId) + final UUID leaderSessionID, + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId) throws Exception { + if (!this.leaderSessionID.equals(leaderSessionID)) { + throw new Exception("Leader id not match, expected: " + this.leaderSessionID + + ", actual: " + leaderSessionID); + } + final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId()); final ExecutionState state = execution != null ? execution.getState() : null; return new PartitionState(taskResultId, partitionId.getPartitionId(), state); } @RpcMethod - public Acknowledge scheduleOrUpdateConsumers(final ResultPartitionID partitionID) { + public Acknowledge scheduleOrUpdateConsumers( + final UUID leaderSessionID, + final ResultPartitionID partitionID) throws Exception + { + if (!this.leaderSessionID.equals(leaderSessionID)) { + throw new Exception("Leader id not match, expected: " + this.leaderSessionID + + ", actual: " + leaderSessionID); + } + executionGraph.scheduleOrUpdateConsumers(partitionID); return Acknowledge.get(); } @@ -609,223 +501,149 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { @RpcMethod public void acknowledgeCheckpoint( - JobID jobID, - ExecutionAttemptID executionAttemptID, - long checkpointID, - CheckpointStateHandles checkpointStateHandles, - long synchronousDurationMillis, - long asynchronousDurationMillis, - long bytesBufferedInAlignment, - long alignmentDurationNanos) { - throw new UnsupportedOperationException(); - } - - @RpcMethod - public void declineCheckpoint( - JobID jobID, - ExecutionAttemptID executionAttemptID, - long checkpointID, - long checkpointTimestamp) { - throw new UnsupportedOperationException(); - } - - @RpcMethod - public void resourceRemoved(final ResourceID resourceId, final String message) { - // TODO: remove resource from slot pool - } + final JobID jobID, + final ExecutionAttemptID executionAttemptID, + final CheckpointMetaData checkpointInfo, + final CheckpointStateHandles checkpointState) + { + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + final AcknowledgeCheckpoint ackMessage = + new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointInfo, checkpointState); - @RpcMethod - public void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge) { - if (executionGraph != null) { - final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); - if (checkpointCoordinator != null) { - getRpcService().execute(new Runnable() { - @Override - public void run() { - try { - if (!checkpointCoordinator.receiveAcknowledgeMessage(acknowledge)) { - log.info("Received message for non-existing checkpoint {}.", - acknowledge.getCheckpointId()); - } - } catch (Exception e) { - log.error("Error in CheckpointCoordinator while processing {}", acknowledge, e); + if (checkpointCoordinator != null) { + getRpcService().execute(new Runnable() { + @Override + public void run() { + try { + if (!checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) { + log.info("Received message for non-existing checkpoint {}.", + checkpointInfo.getCheckpointId()); } + } catch (Exception e) { + log.error("Error in CheckpointCoordinator while processing {}", checkpointInfo, e); } - }); - } - else { - log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); - } + } + }); } else { - log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID()); + log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", + jobGraph.getJobID()); } } @RpcMethod - public void declineCheckpoint(final DeclineCheckpoint decline) { - if (executionGraph != null) { - final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); - if (checkpointCoordinator != null) { - getRpcService().execute(new Runnable() { - @Override - public void run() { - try { - if (!checkpointCoordinator.receiveDeclineMessage(decline)) { - log.info("Received message for non-existing checkpoint {}.", decline.getCheckpointId()); - } - } catch (Exception e) { - log.error("Error in CheckpointCoordinator while processing {}", decline, e); + public void declineCheckpoint( + final JobID jobID, + final ExecutionAttemptID executionAttemptID, + final long checkpointID) + { + final DeclineCheckpoint decline = new DeclineCheckpoint( + jobID, executionAttemptID, checkpointID, 0L); + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + + if (checkpointCoordinator != null) { + getRpcService().execute(new Runnable() { + @Override + public void run() { + try { + if (!checkpointCoordinator.receiveDeclineMessage(decline)) { + log.info("Received message for non-existing checkpoint {}.", decline.getCheckpointId()); } + } catch (Exception e) { + log.error("Error in CheckpointCoordinator while processing {}", decline, e); } - }); - } else { - log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); - } + } + }); } else { - log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID()); + log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", + jobGraph.getJobID()); } } @RpcMethod public KvStateLocation lookupKvStateLocation(final String registrationName) throws Exception { - if (executionGraph != null) { - if (log.isDebugEnabled()) { - log.debug("Lookup key-value state for job {} with registration " + + if (log.isDebugEnabled()) { + log.debug("Lookup key-value state for job {} with registration " + "name {}.", jobGraph.getJobID(), registrationName); - } + } - final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry(); - final KvStateLocation location = registry.getKvStateLocation(registrationName); - if (location != null) { - return location; - } else { - throw new UnknownKvStateLocation(registrationName); - } + final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry(); + final KvStateLocation location = registry.getKvStateLocation(registrationName); + if (location != null) { + return location; } else { - throw new IllegalStateException("Received lookup KvState location request for unavailable job " + - jobGraph.getJobID()); + throw new UnknownKvStateLocation(registrationName); } } @RpcMethod public void notifyKvStateRegistered( - final JobVertexID jobVertexId, - final KeyGroupRange keyGroupRange, - final String registrationName, - final KvStateID kvStateId, - final KvStateServerAddress kvStateServerAddress) + final JobVertexID jobVertexId, + final KeyGroupRange keyGroupRange, + final String registrationName, + final KvStateID kvStateId, + final KvStateServerAddress kvStateServerAddress) { - if (executionGraph != null) { - if (log.isDebugEnabled()) { - log.debug("Key value state registered for job {} under name {}.", + if (log.isDebugEnabled()) { + log.debug("Key value state registered for job {} under name {}.", jobGraph.getJobID(), registrationName); - } - try { - executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered( - jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress - ); - } catch (Exception e) { - log.error("Failed to notify KvStateRegistry about registration {}.", registrationName); - } - } else { - log.error("Received notify KvState registered request for unavailable job " + jobGraph.getJobID()); + } + + try { + executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered( + jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress); + } catch (Exception e) { + log.error("Failed to notify KvStateRegistry about registration {}.", registrationName); } } @RpcMethod public void notifyKvStateUnregistered( - JobVertexID jobVertexId, - KeyGroupRange keyGroupRange, - String registrationName) + JobVertexID jobVertexId, + KeyGroupRange keyGroupRange, + String registrationName) { - if (executionGraph != null) { - if (log.isDebugEnabled()) { - log.debug("Key value state unregistered for job {} under name {}.", + if (log.isDebugEnabled()) { + log.debug("Key value state unregistered for job {} under name {}.", jobGraph.getJobID(), registrationName); - } - try { - executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered( - jobVertexId, keyGroupRange, registrationName - ); - } catch (Exception e) { - log.error("Failed to notify KvStateRegistry about registration {}.", registrationName); - } - } else { - log.error("Received notify KvState unregistered request for unavailable job " + jobGraph.getJobID()); } - } - - @RpcMethod - public Future<TriggerSavepointResponse> triggerSavepoint() throws Exception { - if (executionGraph != null) { - final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); - if (checkpointCoordinator != null) { - try { - Future<String> savepointFuture = new FlinkFuture<>( - checkpointCoordinator.triggerSavepoint(System.currentTimeMillis())); - - return savepointFuture.handleAsync(new BiFunction<String, Throwable, TriggerSavepointResponse>() { - @Override - public TriggerSavepointResponse apply(String savepointPath, Throwable throwable) { - if (throwable == null) { - return new TriggerSavepointResponse.Success(jobGraph.getJobID(), savepointPath); - } - else { - return new TriggerSavepointResponse.Failure(jobGraph.getJobID(), - new Exception("Failed to complete savepoint", throwable)); - } - } - }, getMainThreadExecutor()); - } catch (Exception e) { - FlinkCompletableFuture<TriggerSavepointResponse> future = new FlinkCompletableFuture<>(); - future.complete(new TriggerSavepointResponse.Failure(jobGraph.getJobID(), - new Exception("Failed to trigger savepoint", e))); - return future; - } - } else { - FlinkCompletableFuture<TriggerSavepointResponse> future = new FlinkCompletableFuture<>(); - future.complete(new TriggerSavepointResponse.Failure(jobGraph.getJobID(), - new IllegalStateException("Checkpointing disabled. You can enable it via the execution " + - "environment of your job."))); - return future; - } - } else { - FlinkCompletableFuture<TriggerSavepointResponse> future = new FlinkCompletableFuture<>(); - future.complete(new TriggerSavepointResponse.Failure(jobGraph.getJobID(), - new IllegalArgumentException("Received trigger savepoint request for unavailable job " + - jobGraph.getJobID()))); - return future; + try { + executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered( + jobVertexId, keyGroupRange, registrationName); + } catch (Exception e) { + log.error("Failed to notify KvStateRegistry about registration {}.", registrationName); } } @RpcMethod - public DisposeSavepointResponse disposeSavepoint(final String savepointPath) { - try { - log.info("Disposing savepoint at {}.", savepointPath); + public Future<String> triggerSavepoint(final UUID leaderSessionID, final String targetDirectory) throws Exception { + if (!this.leaderSessionID.equals(leaderSessionID)) { + throw new Exception("Leader id not match, expected: " + this.leaderSessionID + + ", actual: " + leaderSessionID); + } - // check whether the savepoint exists - savepointStore.loadSavepoint(savepointPath); + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator != null) { + Future<CompletedCheckpoint> completedCheckpointFuture = + checkpointCoordinator.triggerSavepoint(System.currentTimeMillis(), targetDirectory); - savepointStore.disposeSavepoint(savepointPath); - return new DisposeSavepointResponse.Success(); - } catch (Exception e) { - log.error("Failed to dispose savepoint at {}.", savepointPath, e); - return new DisposeSavepointResponse.Failure(e); + return completedCheckpointFuture.thenApplyAsync(new ApplyFunction<CompletedCheckpoint, String>() { + @Override + public String apply(CompletedCheckpoint checkpoint) { + return checkpoint.getExternalPath(); + } + }, executionContext); + } else { + throw new IllegalStateException("Checkpointing disabled. You can enable it via the execution " + + "environment of your job."); } } @RpcMethod public ClassloadingProps requestClassloadingProps() throws Exception { - if (executionGraph != null) { - return new ClassloadingProps(libraryCacheManager.getBlobServerPort(), + return new ClassloadingProps(libraryCacheManager.getBlobServerPort(), executionGraph.getRequiredJarFiles(), executionGraph.getRequiredClasspaths()); - } else { - throw new Exception("Received classloading props request for unavailable job " + jobGraph.getJobID()); - } } //---------------------------------------------------------------------------------------------- @@ -838,12 +656,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { public void run() { log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause); shutDown(); - jobCompletionActions.onFatalError(cause); + errorHandler.onFatalError(cause); } }); } - // TODO - wrap this as StatusListenerMessenger's callback with rpc main thread private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { final JobID jobID = executionGraph.getJobID(); final String jobName = executionGraph.getJobName(); @@ -871,36 +688,33 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { if (newJobStatus == JobStatus.FINISHED) { try { final Map<String, SerializedValue<Object>> accumulatorResults = - executionGraph.getAccumulatorsSerialized(); + executionGraph.getAccumulatorsSerialized(); final SerializedJobExecutionResult result = new SerializedJobExecutionResult( - jobID, 0, accumulatorResults // TODO get correct job duration + jobID, 0, accumulatorResults // TODO get correct job duration ); jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader)); } catch (Exception e) { log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e); final JobExecutionException exception = new JobExecutionException( - jobID, "Failed to retrieve accumulator results.", e); + jobID, "Failed to retrieve accumulator results.", e); // TODO should we also notify client? jobCompletionActions.jobFailed(exception); } - } - else if (newJobStatus == JobStatus.CANCELED) { + } else if (newJobStatus == JobStatus.CANCELED) { final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader); final JobExecutionException exception = new JobExecutionException( - jobID, "Job was cancelled.", unpackedError); + jobID, "Job was cancelled.", unpackedError); // TODO should we also notify client? jobCompletionActions.jobFailed(exception); - } - else if (newJobStatus == JobStatus.FAILED) { + } else if (newJobStatus == JobStatus.FAILED) { final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader); final JobExecutionException exception = new JobExecutionException( - jobID, "Job execution failed.", unpackedError); + jobID, "Job execution failed.", unpackedError); // TODO should we also notify client? jobCompletionActions.jobFailed(exception); - } - else { + } else { final JobExecutionException exception = new JobExecutionException( - jobID, newJobStatus + " is not a terminal state."); + jobID, newJobStatus + " is not a terminal state."); // TODO should we also notify client? jobCompletionActions.jobFailed(exception); throw new RuntimeException(exception); @@ -909,7 +723,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } private void notifyOfNewResourceManagerLeader( - final String resourceManagerAddress, final UUID resourceManagerLeaderId) + final String resourceManagerAddress, final UUID resourceManagerLeaderId) { // IMPORTANT: executed by main thread to avoid concurrence runAsync(new Runnable() { @@ -918,17 +732,15 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { if (resourceManagerConnection != null) { if (resourceManagerAddress != null) { if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress()) - && resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) - { + && resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) { // both address and leader id are not changed, we can keep the old connection return; } log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", - resourceManagerConnection.getTargetAddress(), resourceManagerAddress); - } - else { + resourceManagerConnection.getTargetAddress(), resourceManagerAddress); + } else { log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", - resourceManagerConnection.getTargetAddress()); + resourceManagerConnection.getTargetAddress()); } } @@ -937,8 +749,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { if (resourceManagerAddress != null) { log.info("Attempting to register at ResourceManager {}", resourceManagerAddress); resourceManagerConnection = new ResourceManagerConnection( - log, jobGraph.getJobID(), leaderSessionID, - resourceManagerAddress, resourceManagerLeaderId, executionContext); + log, jobGraph.getJobID(), leaderSessionID, + resourceManagerAddress, resourceManagerLeaderId, executionContext); resourceManagerConnection.start(); } } @@ -952,26 +764,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { // TODO - add tests for comment in https://github.com/apache/flink/pull/2565 // verify the response with current connection if (resourceManagerConnection != null - && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) { + && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) { log.info("JobManager successfully registered at ResourceManager, leader id: {}.", - success.getResourceManagerLeaderId()); + success.getResourceManagerLeaderId()); } } }); } - private void disposeCommunicationWithResourceManager() { - // 1. stop the leader retriever so we will not receiving updates anymore - try { - resourceManagerLeaderRetriever.stop(); - } catch (Exception e) { - log.warn("Failed to stop resource manager leader retriever."); - } - - // 2. close current connection with ResourceManager if exists - closeResourceManagerConnection(); - } - private void closeResourceManagerConnection() { if (resourceManagerConnection != null) { resourceManagerConnection.close(); @@ -980,34 +780,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } //---------------------------------------------------------------------------------------------- - // Helper methods - //---------------------------------------------------------------------------------------------- - - /** - * Converts JobVertexIDs to corresponding ExecutionJobVertexes - * - * @param executionGraph The execution graph that holds the relationship - * @param vertexIDs The vertexIDs need to be converted - * @return The corresponding ExecutionJobVertexes - * @throws JobExecutionException - */ - private static List<ExecutionJobVertex> getExecutionJobVertexWithId( - final ExecutionGraph executionGraph, final List<JobVertexID> vertexIDs) - throws JobExecutionException - { - final List<ExecutionJobVertex> ret = new ArrayList<>(vertexIDs.size()); - for (JobVertexID vertexID : vertexIDs) { - final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(vertexID); - if (executionJobVertex == null) { - throw new JobExecutionException(executionGraph.getJobID(), - "The snapshot checkpointing settings refer to non-existent vertex " + vertexID); - } - ret.add(executionJobVertex); - } - return ret; - } - - //---------------------------------------------------------------------------------------------- // Utility classes //---------------------------------------------------------------------------------------------- @@ -1024,19 +796,19 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } private class ResourceManagerConnection - extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess> + extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess> { private final JobID jobID; private final UUID jobManagerLeaderID; ResourceManagerConnection( - final Logger log, - final JobID jobID, - final UUID jobManagerLeaderID, - final String resourceManagerAddress, - final UUID resourceManagerLeaderID, - final Executor executor) + final Logger log, + final JobID jobID, + final UUID jobManagerLeaderID, + final String resourceManagerAddress, + final UUID resourceManagerLeaderID, + final Executor executor) { super(log, resourceManagerAddress, resourceManagerLeaderID, executor); this.jobID = checkNotNull(jobID); @@ -1046,12 +818,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { @Override protected RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() { return new RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess>( - log, getRpcService(), "ResourceManager", ResourceManagerGateway.class, - getTargetAddress(), getTargetLeaderId()) + log, getRpcService(), "ResourceManager", ResourceManagerGateway.class, + getTargetAddress(), getTargetLeaderId()) { @Override protected Future<RegistrationResponse> invokeRegistration(ResourceManagerGateway gateway, UUID leaderId, - long timeoutMillis) throws Exception + long timeoutMillis) throws Exception { Time timeout = Time.milliseconds(timeoutMillis); return gateway.registerJobMaster(leaderId, jobManagerLeaderID, getAddress(), jobID, timeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/0615b62f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 4b51258..b27b41c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.jobmaster; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -29,15 +31,11 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; -import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse; -import org.apache.flink.runtime.jobmaster.message.NextInputSplit; -import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse; -import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; -import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateServerAddress; -import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -49,52 +47,56 @@ import java.util.UUID; */ public interface JobMasterGateway extends CheckpointCoordinatorGateway { - /** - * Starting the job under the given leader session ID. - */ - void startJob(final UUID leaderSessionID); + // ------------------------------------------------------------------------ + // Job start and stop methods + // ------------------------------------------------------------------------ - /** - * Suspending job, all the running tasks will be cancelled, and runtime status will be cleared. - * Should re-submit the job before restarting it. - * - * @param cause The reason of why this job been suspended. - */ - void suspendJob(final Throwable cause); + void startJobExecution(); + + void suspendExecution(Throwable cause); + + // ------------------------------------------------------------------------ /** * Updates the task execution state for a given task. * + * @param leaderSessionID The leader id of JobManager * @param taskExecutionState New task execution state for a given task * @return Future flag of the task execution state update result */ - Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState); + Future<Acknowledge> updateTaskExecutionState( + final UUID leaderSessionID, + final TaskExecutionState taskExecutionState); /** * Requesting next input split for the {@link ExecutionJobVertex}. The next input split is sent back to the sender * as a {@link SerializedInputSplit} message. * + * @param leaderSessionID The leader id of JobManager * @param vertexID The job vertex id * @param executionAttempt The execution attempt id * @return The future of the input split. If there is no further input split, will return an empty object. */ Future<SerializedInputSplit> requestNextInputSplit( - final JobVertexID vertexID, - final ExecutionAttemptID executionAttempt); + final UUID leaderSessionID, + final JobVertexID vertexID, + final ExecutionAttemptID executionAttempt); /** * Requests the current state of the partition. * The state of a partition is currently bound to the state of the producing execution. * + * @param leaderSessionID The leader id of JobManager * @param partitionId The partition ID of the partition to request the state of. * @param taskExecutionId The execution attempt ID of the task requesting the partition state. * @param taskResultId The input gate ID of the task requesting the partition state. * @return The future of the partition state */ Future<PartitionState> requestPartitionState( - final ResultPartitionID partitionId, - final ExecutionAttemptID taskExecutionId, - final IntermediateDataSetID taskResultId); + final UUID leaderSessionID, + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId); /** * Notifies the JobManager about available data for a produced partition. @@ -105,11 +107,15 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { * <p> * The JobManager then can decide when to schedule the partition consumers of the given session. * - * @param partitionID The partition which has already produced data - * @param timeout before the rpc call fails + * @param leaderSessionID The leader id of JobManager + * @param partitionID The partition which has already produced data + * @param timeout before the rpc call fails * @return Future acknowledge of the schedule or update operation */ - Future<Acknowledge> scheduleOrUpdateConsumers(final ResultPartitionID partitionID, @RpcTimeout final Time timeout); + Future<Acknowledge> scheduleOrUpdateConsumers( + final UUID leaderSessionID, + final ResultPartitionID partitionID, + @RpcTimeout final Time timeout); /** * Disconnects the given {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} from the @@ -118,31 +124,6 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { * @param resourceID identifying the TaskManager to disconnect */ void disconnectTaskManager(ResourceID resourceID); - void scheduleOrUpdateConsumers(final ResultPartitionID partitionID); - - /** - * Notifies the JobManager about the removal of a resource. - * - * @param resourceId The ID under which the resource is registered. - * @param message Optional message with details, for logging and debugging. - */ - - void resourceRemoved(final ResourceID resourceId, final String message); - - /** - * Notifies the JobManager that the checkpoint of an individual task is completed. - * - * @param acknowledge The acknowledge message of the checkpoint - */ - void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge); - - /** - * Notifies the JobManager that a checkpoint request could not be heeded. - * This can happen if a Task is already in RUNNING state but is internally not yet ready to perform checkpoints. - * - * @param decline The decline message of the checkpoint - */ - void declineCheckpoint(final DeclineCheckpoint decline); /** * Requests a {@link KvStateLocation} for the specified {@link KvState} registration name. @@ -150,7 +131,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { * @param registrationName Name under which the KvState has been registered. * @return Future of the requested {@link KvState} location */ - Future<KvStateLocation> lookupKvStateLocation(final String registrationName) throws Exception; + Future<KvStateLocation> lookupKvStateLocation(final String registrationName); /** * @param jobVertexId JobVertexID the KvState instance belongs to. @@ -160,11 +141,11 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { * @param kvStateServerAddress Server address where to find the KvState instance. */ void notifyKvStateRegistered( - final JobVertexID jobVertexId, - final KeyGroupRange keyGroupRange, - final String registrationName, - final KvStateID kvStateId, - final KvStateServerAddress kvStateServerAddress); + final JobVertexID jobVertexId, + final KeyGroupRange keyGroupRange, + final String registrationName, + final KvStateID kvStateId, + final KvStateServerAddress kvStateServerAddress); /** * @param jobVertexId JobVertexID the KvState instance belongs to. @@ -172,24 +153,18 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { * @param registrationName Name under which the KvState has been registered. */ void notifyKvStateUnregistered( - JobVertexID jobVertexId, - KeyGroupRange keyGroupRange, - String registrationName); + JobVertexID jobVertexId, + KeyGroupRange keyGroupRange, + String registrationName); /** * Notifies the JobManager to trigger a savepoint for this job. * - * @return Future of the savepoint trigger response. - */ - Future<TriggerSavepointResponse> triggerSavepoint(); - - /** - * Notifies the Jobmanager to dispose specified savepoint. - * - * @param savepointPath The path of the savepoint. - * @return The future of the savepoint disponse response. + * @param leaderSessionID The leader id of JobManager + * @param targetDirectory The directory where the savepoint should be stored + * @return The savepoint path */ - Future<DisposeSavepointResponse> disposeSavepoint(final String savepointPath); + Future<String> triggerSavepoint(UUID leaderSessionID, String targetDirectory); /** * Request the classloading props of this job. http://git-wip-us.apache.org/repos/asf/flink/blob/0615b62f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java index e8fb5bb..019ccfe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java @@ -25,6 +25,8 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.slf4j.Logger; @@ -62,6 +64,9 @@ public class MiniClusterJobDispatcher { /** al the services that the JobManager needs, such as BLOB service, factories, etc */ private final JobManagerServices jobManagerServices; + /** Registry for all metrics in the mini cluster */ + private final MetricRegistry metricRegistry; + /** The number of JobManagers to launch (more than one simulates a high-availability setup) */ private final int numJobManagers; @@ -86,8 +91,9 @@ public class MiniClusterJobDispatcher { public MiniClusterJobDispatcher( Configuration config, RpcService rpcService, - HighAvailabilityServices haServices) throws Exception { - this(config, rpcService, haServices, 1); + HighAvailabilityServices haServices, + MetricRegistry metricRegistry) throws Exception { + this(config, rpcService, haServices, metricRegistry, 1); } /** @@ -106,16 +112,18 @@ public class MiniClusterJobDispatcher { Configuration config, RpcService rpcService, HighAvailabilityServices haServices, + MetricRegistry metricRegistry, int numJobManagers) throws Exception { checkArgument(numJobManagers >= 1); this.configuration = checkNotNull(config); this.rpcService = checkNotNull(rpcService); this.haServices = checkNotNull(haServices); + this.metricRegistry = checkNotNull(metricRegistry); this.numJobManagers = numJobManagers; LOG.info("Creating JobMaster services"); - this.jobManagerServices = JobManagerServices.fromConfiguration(config); + this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices); } // ------------------------------------------------------------------------ @@ -140,9 +148,8 @@ public class MiniClusterJobDispatcher { if (runners != null) { this.runners = null; - Exception shutdownException = new Exception("The MiniCluster is shutting down"); for (JobManagerRunner runner : runners) { - runner.shutdown(shutdownException); + runner.shutdown(); } } } @@ -171,9 +178,9 @@ public class MiniClusterJobDispatcher { checkState(!shutdown, "mini cluster is shut down"); checkState(runners == null, "mini cluster can only execute one job at a time"); - OnCompletionActions onJobCompletion = new DetachedFinalizer(numJobManagers); + DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers); - this.runners = startJobRunners(job, onJobCompletion); + this.runners = startJobRunners(job, finalizer, finalizer); } } @@ -191,17 +198,17 @@ public class MiniClusterJobDispatcher { checkNotNull(job); LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID()); - final BlockingJobSync onJobCompletion = new BlockingJobSync(job.getJobID(), numJobManagers); + final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers); synchronized (lock) { checkState(!shutdown, "mini cluster is shut down"); checkState(runners == null, "mini cluster can only execute one job at a time"); - this.runners = startJobRunners(job, onJobCompletion); + this.runners = startJobRunners(job, sync, sync); } try { - return onJobCompletion.getResult(); + return sync.getResult(); } finally { // always clear the status for the next job @@ -209,24 +216,26 @@ public class MiniClusterJobDispatcher { } } - private JobManagerRunner[] startJobRunners(JobGraph job, OnCompletionActions onCompletion) throws JobExecutionException { + private JobManagerRunner[] startJobRunners( + JobGraph job, + OnCompletionActions onCompletion, + FatalErrorHandler errorHandler) throws JobExecutionException { LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID()); JobManagerRunner[] runners = new JobManagerRunner[numJobManagers]; for (int i = 0; i < numJobManagers; i++) { try { runners[i] = new JobManagerRunner(job, configuration, - rpcService, haServices, jobManagerServices, onCompletion); + rpcService, haServices, jobManagerServices, metricRegistry, + onCompletion, errorHandler); runners[i].start(); } catch (Throwable t) { // shut down all the ones so far - Exception shutdownCause = new Exception("Failed to properly start all mini cluster JobManagers", t); - for (int k = 0; k <= i; k++) { try { if (runners[i] != null) { - runners[i].shutdown(shutdownCause); + runners[i].shutdown(); } } catch (Throwable ignored) { // silent shutdown @@ -244,15 +253,15 @@ public class MiniClusterJobDispatcher { // test methods to simulate job master failures // ------------------------------------------------------------------------ - public void killJobMaster(int which) { - checkArgument(which >= 0 && which < numJobManagers, "no such job master"); - checkState(!shutdown, "mini cluster is shut down"); - - JobManagerRunner[] runners = this.runners; - checkState(runners != null, "mini cluster it not executing a job right now"); - - runners[which].shutdown(new Throwable("kill JobManager")); - } +// public void killJobMaster(int which) { +// checkArgument(which >= 0 && which < numJobManagers, "no such job master"); +// checkState(!shutdown, "mini cluster is shut down"); +// +// JobManagerRunner[] runners = this.runners; +// checkState(runners != null, "mini cluster it not executing a job right now"); +// +// runners[which].shutdown(new Throwable("kill JobManager")); +// } // ------------------------------------------------------------------------ // utility classes @@ -263,7 +272,7 @@ public class MiniClusterJobDispatcher { * In the case of a high-availability test setup, there may be multiple runners. * After that, it marks the mini cluster as ready to receive new jobs. */ - private class DetachedFinalizer implements OnCompletionActions { + private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler { private final AtomicInteger numJobManagersToWaitFor; @@ -308,7 +317,7 @@ public class MiniClusterJobDispatcher { * That way it is guaranteed that after the blocking job submit call returns, * the dispatcher is immediately free to accept another job. */ - private static class BlockingJobSync implements OnCompletionActions { + private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler { private final JobID jobId; http://git-wip-us.apache.org/repos/asf/flink/blob/0615b62f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java deleted file mode 100644 index 42bfc71..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmaster.message; - -import java.io.Serializable; - -/** - * The response of the dispose savepoint request to JobManager. - */ -public abstract class DisposeSavepointResponse implements Serializable { - - private static final long serialVersionUID = 6008792963949369567L; - - public static class Success extends DisposeSavepointResponse implements Serializable { - - private static final long serialVersionUID = 1572462960008711415L; - } - - public static class Failure extends DisposeSavepointResponse implements Serializable { - - private static final long serialVersionUID = -7505308325483022458L; - - private final Throwable cause; - - public Failure(final Throwable cause) { - this.cause = cause; - } - - public Throwable getCause() { - return cause; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0615b62f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java deleted file mode 100644 index 0b0edc5..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmaster.message; - -import org.apache.flink.api.common.JobID; - -import java.io.Serializable; - -/** - * The response of the trigger savepoint request to JobManager. - */ -public abstract class TriggerSavepointResponse implements Serializable { - - private static final long serialVersionUID = 3139327824611807707L; - - private final JobID jobID; - - public JobID getJobID() { - return jobID; - } - - public TriggerSavepointResponse(final JobID jobID) { - this.jobID = jobID; - } - - public static class Success extends TriggerSavepointResponse implements Serializable { - - private static final long serialVersionUID = -1100637460388881776L; - - private final String savepointPath; - - public Success(final JobID jobID, final String savepointPath) { - super(jobID); - this.savepointPath = savepointPath; - } - - public String getSavepointPath() { - return savepointPath; - } - } - - public static class Failure extends TriggerSavepointResponse implements Serializable { - - private static final long serialVersionUID = -1668479003490615139L; - - private final Throwable cause; - - public Failure(final JobID jobID, final Throwable cause) { - super(jobID); - this.cause = cause; - } - - public Throwable getCause() { - return cause; - } - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/0615b62f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index 2052f98..4b9100a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -33,8 +33,8 @@ import java.util.concurrent.TimeUnit; public interface RpcService { /** - * Return the address under which the rpc service can be reached. If the rpc service cannot be - * contacted remotely, then it will return an empty string. + * Return the hostname or host address under which the rpc service can be reached. + * If the rpc service cannot be contacted remotely, then it will return an empty string. * * @return Address of the rpc service or empty string if local rpc service */ http://git-wip-us.apache.org/repos/asf/flink/blob/0615b62f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java index ef62ef1..6fcd082 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java @@ -26,11 +26,16 @@ import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.util.Preconditions; +import java.util.UUID; + /** * Container class for JobManager specific communication utils used by the {@link TaskExecutor}. */ public class JobManagerConnection { + // Job master leader session id + private final UUID jobMasterLeaderId; + // Gateway to the job master private final JobMasterGateway jobMasterGateway; @@ -50,13 +55,15 @@ public class JobManagerConnection { private final PartitionStateChecker partitionStateChecker; public JobManagerConnection( - JobMasterGateway jobMasterGateway, - TaskManagerActions taskManagerActions, - CheckpointResponder checkpointResponder, - LibraryCacheManager libraryCacheManager, - ResultPartitionConsumableNotifier resultPartitionConsumableNotifier, - PartitionStateChecker partitionStateChecker) { - + UUID jobMasterLeaderId, + JobMasterGateway jobMasterGateway, + TaskManagerActions taskManagerActions, + CheckpointResponder checkpointResponder, + LibraryCacheManager libraryCacheManager, + ResultPartitionConsumableNotifier resultPartitionConsumableNotifier, + PartitionStateChecker partitionStateChecker) + { + this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId); this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); this.taskManagerActions = Preconditions.checkNotNull(taskManagerActions); this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder); @@ -65,6 +72,10 @@ public class JobManagerConnection { this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker); } + public UUID getJobMasterLeaderId() { + return jobMasterLeaderId; + } + public JobMasterGateway getJobManagerGateway() { return jobMasterGateway; }