http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index b94f904..abc59cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -18,26 +18,19 @@ package org.apache.flink.runtime.jobmaster; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; -import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.SubtaskState; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; -import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; -import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; -import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.client.SerializedJobExecutionResult; @@ -48,9 +41,10 @@ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionGraphException; +import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -59,16 +53,11 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; -import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse; -import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; @@ -84,22 +73,26 @@ import org.apache.flink.runtime.registration.RegisteredRpcConnection; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.registration.RetryingRegistration; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.StartStoppable; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.SerializedValue; + import org.slf4j.Logger; +import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -110,16 +103,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * It offers the following methods as part of its rpc interface to interact with the JobMaster * remotely: * <ul> - * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for + * <li>{@link #updateTaskExecutionState} updates the task execution state for * given task</li> * </ul> */ public class JobMaster extends RpcEndpoint<JobMasterGateway> { + private static final AtomicReferenceFieldUpdater<JobMaster, UUID> LEADER_ID_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(JobMaster.class, UUID.class, "leaderSessionID"); + + // ------------------------------------------------------------------------ + /** Logical representation of the job */ private final JobGraph jobGraph; - /** Configuration of the job */ + /** Configuration of the JobManager */ private final Configuration configuration; /** Service to contend for and retrieve the leadership of JM and RM */ @@ -128,37 +126,24 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { /** Blob cache manager used across jobs */ private final BlobLibraryCacheManager libraryCacheManager; - /** Factory to create restart strategy for this job */ - private final RestartStrategyFactory restartStrategyFactory; - - /** Store for save points */ - private final SavepointStore savepointStore; - - /** The timeout for this job */ - private final Time timeout; - - /** The scheduler to use for scheduling new tasks as they are needed */ - private final Scheduler scheduler; + /** The metrics for the JobManager itself */ + private final MetricGroup jobManagerMetricGroup; - /** The metrics group used across jobs */ - private final JobManagerMetricGroup jobManagerMetricGroup; + /** The metrics for the job */ + private final MetricGroup jobMetricGroup; /** The execution context which is used to execute futures */ - private final Executor executionContext; + private final ExecutorService executionContext; private final OnCompletionActions jobCompletionActions; - /** The execution graph of this job */ - private volatile ExecutionGraph executionGraph; - - /** The checkpoint recovery factory used by this job */ - private CheckpointRecoveryFactory checkpointRecoveryFactory; + private final FatalErrorHandler errorHandler; - private ClassLoader userCodeLoader; + private final ClassLoader userCodeLoader; - private RestartStrategy restartStrategy; + /** The execution graph of this job */ + private final ExecutionGraph executionGraph; - private MetricGroup jobMetrics; private volatile UUID leaderSessionID; @@ -168,22 +153,26 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { private LeaderRetrievalService resourceManagerLeaderRetriever; /** Connection with ResourceManager, null if not located address yet or we close it initiative */ - private volatile ResourceManagerConnection resourceManagerConnection; + private ResourceManagerConnection resourceManagerConnection; + + // TODO - we need to replace this with the slot pool + private final Scheduler scheduler; // ------------------------------------------------------------------------ public JobMaster( - JobGraph jobGraph, - Configuration configuration, - RpcService rpcService, - HighAvailabilityServices highAvailabilityService, - BlobLibraryCacheManager libraryCacheManager, - RestartStrategyFactory restartStrategyFactory, - SavepointStore savepointStore, - Time timeout, - Scheduler scheduler, - JobManagerMetricGroup jobManagerMetricGroup, - OnCompletionActions jobCompletionActions) + JobGraph jobGraph, + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityService, + ExecutorService executorService, + BlobLibraryCacheManager libraryCacheManager, + RestartStrategyFactory restartStrategyFactory, + Time rpcAskTimeout, + @Nullable JobManagerMetricGroup jobManagerMetricGroup, + OnCompletionActions jobCompletionActions, + FatalErrorHandler errorHandler, + ClassLoader userCodeLoader) throws Exception { super(rpcService); @@ -191,293 +180,150 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { this.configuration = checkNotNull(configuration); this.highAvailabilityServices = checkNotNull(highAvailabilityService); this.libraryCacheManager = checkNotNull(libraryCacheManager); - this.restartStrategyFactory = checkNotNull(restartStrategyFactory); - this.savepointStore = checkNotNull(savepointStore); - this.timeout = checkNotNull(timeout); - this.scheduler = checkNotNull(scheduler); - this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup); - this.executionContext = checkNotNull(rpcService.getExecutor()); + this.executionContext = checkNotNull(executorService); this.jobCompletionActions = checkNotNull(jobCompletionActions); - } - - //---------------------------------------------------------------------------------------------- - // Lifecycle management - //---------------------------------------------------------------------------------------------- + this.errorHandler = checkNotNull(errorHandler); + this.userCodeLoader = checkNotNull(userCodeLoader); - /** - * Initializing the job execution environment, should be called before start. Any error occurred during - * initialization will be treated as job submission failure. - * - * @throws JobSubmissionException - */ - public void init() throws JobSubmissionException { - log.info("Initializing job {} ({}).", jobGraph.getName(), jobGraph.getJobID()); + final String jobName = jobGraph.getName(); + final JobID jid = jobGraph.getJobID(); - try { - // IMPORTANT: We need to make sure that the library registration is the first action, - // because this makes sure that the uploaded jar files are removed in case of - // unsuccessful - try { - libraryCacheManager.registerJob(jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), - jobGraph.getClasspaths()); - } catch (Throwable t) { - throw new JobSubmissionException(jobGraph.getJobID(), - "Cannot set up the user code libraries: " + t.getMessage(), t); - } + if (jobManagerMetricGroup != null) { + this.jobManagerMetricGroup = jobManagerMetricGroup; + this.jobMetricGroup = jobManagerMetricGroup.addJob(jobGraph); + } else { + this.jobManagerMetricGroup = new UnregisteredMetricsGroup(); + this.jobMetricGroup = new UnregisteredMetricsGroup(); + } - userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID()); - if (userCodeLoader == null) { - throw new JobSubmissionException(jobGraph.getJobID(), - "The user code class loader could not be initialized."); - } + log.info("Initializing job {} ({}).", jobName, jid); - if (jobGraph.getNumberOfVertices() == 0) { - throw new JobSubmissionException(jobGraph.getJobID(), "The given job is empty"); - } - - final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = + final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = jobGraph.getSerializedExecutionConfig() - .deserializeValue(userCodeLoader) - .getRestartStrategy(); - if (restartStrategyConfiguration != null) { - restartStrategy = RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration); - } - else { - restartStrategy = restartStrategyFactory.createRestartStrategy(); - } + .deserializeValue(userCodeLoader) + .getRestartStrategy(); - log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobGraph.getName(), jobGraph.getJobID()); + final RestartStrategy restartStrategy = (restartStrategyConfiguration != null) ? + RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) : + restartStrategyFactory.createRestartStrategy(); - if (jobManagerMetricGroup != null) { - jobMetrics = jobManagerMetricGroup.addJob(jobGraph); - } - if (jobMetrics == null) { - jobMetrics = new UnregisteredMetricsGroup(); - } + log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid); - try { - checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory(); - } catch (Exception e) { - log.error("Could not get the checkpoint recovery factory.", e); - throw new JobSubmissionException(jobGraph.getJobID(), "Could not get the checkpoint recovery factory.", e); - } + CheckpointRecoveryFactory checkpointRecoveryFactory; + try { + checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory(); + } catch (Exception e) { + log.error("Could not create the access to highly-available checkpoint storage.", e); + throw new Exception("Could not create the access to highly-available checkpoint storage.", e); + } - try { - resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); - } catch (Exception e) { - log.error("Could not get the resource manager leader retriever.", e); - throw new JobSubmissionException(jobGraph.getJobID(), + try { + resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); + } catch (Exception e) { + log.error("Could not get the resource manager leader retriever.", e); + throw new JobSubmissionException(jobGraph.getJobID(), "Could not get the resource manager leader retriever.", e); - } - } catch (Throwable t) { - log.error("Failed to initializing job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t); - - libraryCacheManager.unregisterJob(jobGraph.getJobID()); - - if (t instanceof JobSubmissionException) { - throw (JobSubmissionException) t; - } - else { - throw new JobSubmissionException(jobGraph.getJobID(), "Failed to initialize job " + - jobGraph.getName() + " (" + jobGraph.getJobID() + ")", t); - } } + + this.executionGraph = ExecutionGraphBuilder.buildGraph( + null, + jobGraph, + configuration, + executorService, + executorService, + userCodeLoader, + checkpointRecoveryFactory, + rpcAskTimeout, + restartStrategy, + jobMetricGroup, + -1, + log); + + // TODO - temp fix + this.scheduler = new Scheduler(executorService); } + //---------------------------------------------------------------------------------------------- + // Lifecycle management + //---------------------------------------------------------------------------------------------- + + @Override public void start() { - super.start(); + throw new UnsupportedOperationException("Should never call start() without leader ID"); } + /** + * Start the rpc service and begin to run the job. + * + * @param leaderSessionID The necessary leader id for running the job. + */ + public void start(final UUID leaderSessionID) { + if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) { + super.start(); + + log.info("Starting JobManager for job {} ({})", jobGraph.getName(), jobGraph.getJobID()); + getSelf().startJobExecution(); + } else { + log.warn("Job already started with leaderId {}, ignoring this start request.", leaderSessionID); + } + } + + /** + * Suspend the job and shutdown all other services including rpc. + */ @Override public void shutDown() { + // make sure there is a graceful exit + getSelf().suspendExecution(new Exception("JobManager is shutting down.")); super.shutDown(); - - suspendJob(new Exception("JobManager is shutting down.")); - - disposeCommunicationWithResourceManager(); } - - //---------------------------------------------------------------------------------------------- // RPC methods //---------------------------------------------------------------------------------------------- - /** - * Start to run the job, runtime data structures like ExecutionGraph will be constructed now and checkpoint - * being recovered. After this, we will begin to schedule the job. - */ - @RpcMethod - public void startJob(final UUID leaderSessionID) { - log.info("Starting job {} ({}) with leaderId {}.", jobGraph.getName(), jobGraph.getJobID(), leaderSessionID); + //-- job starting and stopping ----------------------------------------------------------------- - this.leaderSessionID = leaderSessionID; + @RpcMethod + public void startJobExecution() { + log.info("Starting execution of job {} ({}) with leaderId {}.", + jobGraph.getName(), jobGraph.getJobID(), leaderSessionID); try { - if (executionGraph != null) { - executionGraph = new ExecutionGraph( - executionContext, - executionContext, - jobGraph.getJobID(), - jobGraph.getName(), - jobGraph.getJobConfiguration(), - jobGraph.getSerializedExecutionConfig(), - timeout, - restartStrategy, - jobGraph.getUserJarBlobKeys(), - jobGraph.getClasspaths(), - userCodeLoader, - jobMetrics); - } else { - // TODO: update last active time in JobInfo - } - - executionGraph.setScheduleMode(jobGraph.getScheduleMode()); - executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling()); - - try { - executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph)); - } catch (Exception e) { - log.warn("Cannot create JSON plan for job {} ({})", jobGraph.getJobID(), jobGraph.getName(), e); - executionGraph.setJsonPlan("{}"); - } - - // initialize the vertices that have a master initialization hook - // file output formats create directories here, input formats create splits - if (log.isDebugEnabled()) { - log.debug("Running initialization on master for job {} ({}).", jobGraph.getJobID(), jobGraph.getName()); - } - for (JobVertex vertex : jobGraph.getVertices()) { - final String executableClass = vertex.getInvokableClassName(); - if (executableClass == null || executableClass.length() == 0) { - throw new JobExecutionException(jobGraph.getJobID(), - "The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class."); - } - if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) { - vertex.setParallelism(scheduler.getTotalNumberOfSlots()); - } - - try { - vertex.initializeOnMaster(userCodeLoader); - } catch (Throwable t) { - throw new JobExecutionException(jobGraph.getJobID(), - "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t); - } - } - - // topologically sort the job vertices and attach the graph to the existing one - final List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources(); - if (log.isDebugEnabled()) { - log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), - jobGraph.getJobID(), jobGraph.getName()); - } - executionGraph.attachJobGraph(sortedTopology); - - if (log.isDebugEnabled()) { - log.debug("Successfully created execution graph from job graph {} ({}).", - jobGraph.getJobID(), jobGraph.getName()); - } - - final JobSnapshottingSettings snapshotSettings = jobGraph.getSnapshotSettings(); - if (snapshotSettings != null) { - List<ExecutionJobVertex> triggerVertices = getExecutionJobVertexWithId( - executionGraph, snapshotSettings.getVerticesToTrigger()); - - List<ExecutionJobVertex> ackVertices = getExecutionJobVertexWithId( - executionGraph, snapshotSettings.getVerticesToAcknowledge()); - - List<ExecutionJobVertex> confirmVertices = getExecutionJobVertexWithId( - executionGraph, snapshotSettings.getVerticesToConfirm()); - - CompletedCheckpointStore completedCheckpoints = checkpointRecoveryFactory.createCheckpointStore( - jobGraph.getJobID(), userCodeLoader); - - CheckpointIDCounter checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter( - jobGraph.getJobID()); - - // Checkpoint stats tracker - boolean isStatsDisabled = configuration.getBoolean( - ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE); - - final CheckpointStatsTracker checkpointStatsTracker; - if (isStatsDisabled) { - checkpointStatsTracker = new DisabledCheckpointStatsTracker(); - } - else { - int historySize = configuration.getInteger( - ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE); - checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics); - } - - String externalizedCheckpointsDir = configuration.getString( - ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, null); - - executionGraph.enableSnapshotCheckpointing( - snapshotSettings.getCheckpointInterval(), - snapshotSettings.getCheckpointTimeout(), - snapshotSettings.getMinPauseBetweenCheckpoints(), - snapshotSettings.getMaxConcurrentCheckpoints(), - snapshotSettings.getExternalizedCheckpointSettings(), - triggerVertices, - ackVertices, - confirmVertices, - checkpointIdCounter, - completedCheckpoints, - externalizedCheckpointsDir, - checkpointStatsTracker); - } - - // TODO: register this class to execution graph as job status change listeners - - // TODO: register client as job / execution status change listeners if they are interested - - /* - TODO: decide whether we should take the savepoint before recovery - - if (isRecovery) { - // this is a recovery of a master failure (this master takes over) - executionGraph.restoreLatestCheckpointedState(); - } else { - if (snapshotSettings != null) { - String savepointPath = snapshotSettings.getSavepointPath(); - if (savepointPath != null) { - // got a savepoint - log.info("Starting job from savepoint {}.", savepointPath); - - // load the savepoint as a checkpoint into the system - final CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint( - jobGraph.getJobID(), executionGraph.getAllVertices(), savepointStore, savepointPath); - executionGraph.getCheckpointCoordinator().getCheckpointStore().addCheckpoint(savepoint); - - // Reset the checkpoint ID counter - long nextCheckpointId = savepoint.getCheckpointID() + 1; - log.info("Reset the checkpoint ID to " + nextCheckpointId); - executionGraph.getCheckpointCoordinator().getCheckpointIdCounter().setCount(nextCheckpointId); - - executionGraph.restoreLatestCheckpointedState(); - } + // register self as job status change listener + executionGraph.registerJobStatusListener(new JobStatusListener() { + @Override + public void jobStatusChanges( + final JobID jobId, final JobStatus newJobStatus, final long timestamp, final Throwable error) + { + // run in rpc thread to avoid concurrency + runAsync(new Runnable() { + @Override + public void run() { + jobStatusChanged(newJobStatus, timestamp, error); + } + }); } - } - */ + }); - // job is good to go, try to locate resource manager's address + // job is ready to go, try to establish connection with resource manager resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); } catch (Throwable t) { + + // TODO - this should not result in a job failure, but another leader should take over + // TODO - either this master should retry the execution, or it should relinquish leadership / terminate + log.error("Failed to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t); executionGraph.fail(t); - executionGraph = null; - final Throwable rt; + final JobExecutionException rt; if (t instanceof JobExecutionException) { rt = (JobExecutionException) t; - } - else { + } else { rt = new JobExecutionException(jobGraph.getJobID(), - "Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t); + "Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t); } // TODO: notify client about this failure @@ -490,34 +336,51 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { executionContext.execute(new Runnable() { @Override public void run() { - if (executionGraph != null) { - try { - executionGraph.scheduleForExecution(scheduler); - } catch (Throwable t) { - executionGraph.fail(t); - } + try { + executionGraph.scheduleForExecution(scheduler); + } catch (Throwable t) { + executionGraph.fail(t); } } }); } /** - * Suspending job, all the running tasks will be cancelled, and runtime data will be cleared. + * Suspending job, all the running tasks will be cancelled, and communication with other components + * will be disposed. + * + * <p>Mostly job is suspended because of the leadership has been revoked, one can be restart this job by + * calling the {@link #start(UUID)} method once we take the leadership back again. * * @param cause The reason of why this job been suspended. */ @RpcMethod - public void suspendJob(final Throwable cause) { + public void suspendExecution(final Throwable cause) { + if (leaderSessionID == null) { + log.debug("Job has already been suspended or shutdown."); + return; + } + + // receive no more messages until started again, should be called before we clear self leader id + ((StartStoppable) getSelf()).stop(); + leaderSessionID = null; + executionGraph.suspend(cause); - if (executionGraph != null) { - executionGraph.suspend(cause); - executionGraph = null; + // disconnect from resource manager: + try { + resourceManagerLeaderRetriever.stop(); + } catch (Exception e) { + log.warn("Failed to stop resource manager leader retriever when suspending."); } + closeResourceManagerConnection(); + + // TODO: disconnect from all registered task managers - disposeCommunicationWithResourceManager(); } + //---------------------------------------------------------------------------------------------- + /** * Updates the task execution state for a given task. * @@ -525,26 +388,38 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { * @return Acknowledge the task execution state update */ @RpcMethod - public Acknowledge updateTaskExecutionState(final TaskExecutionState taskExecutionState) throws ExecutionGraphException { + public Acknowledge updateTaskExecutionState( + final UUID leaderSessionID, + final TaskExecutionState taskExecutionState) throws Exception + { if (taskExecutionState == null) { throw new NullPointerException("TaskExecutionState must not be null."); } + if (!this.leaderSessionID.equals(leaderSessionID)) { + throw new Exception("Leader id not match, expected: " + this.leaderSessionID + + ", actual: " + leaderSessionID); + } + if (executionGraph.updateState(taskExecutionState)) { return Acknowledge.get(); } else { throw new ExecutionGraphException("The execution attempt " + - taskExecutionState.getID() + " was not found."); + taskExecutionState.getID() + " was not found."); } - } - @RpcMethod public SerializedInputSplit requestNextInputSplit( - final JobVertexID vertexID, - final ExecutionAttemptID executionAttempt) throws Exception + final UUID leaderSessionID, + final JobVertexID vertexID, + final ExecutionAttemptID executionAttempt) throws Exception { + if (!this.leaderSessionID.equals(leaderSessionID)) { + throw new Exception("Leader id not match, expected: " + this.leaderSessionID + + ", actual: " + leaderSessionID); + } + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); if (execution == null) { // can happen when JobManager had already unregistered this execution upon on task failure, @@ -583,7 +458,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } catch (Exception ex) { log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); IOException reason = new IOException("Could not serialize the next input split of class " + - nextInputSplit.getClass() + ".", ex); + nextInputSplit.getClass() + ".", ex); vertex.fail(reason); throw reason; } @@ -591,16 +466,21 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { @RpcMethod public ExecutionState requestPartitionState( - final JobID ignored, - final IntermediateDataSetID intermediateResultId, - final ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException { + final UUID leaderSessionID, + final IntermediateDataSetID intermediateResultId, + final ResultPartitionID resultPartitionId) throws Exception { + + if (!this.leaderSessionID.equals(leaderSessionID)) { + throw new Exception("Leader id not match, expected: " + this.leaderSessionID + + ", actual: " + leaderSessionID); + } final Execution execution = executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId()); if (execution != null) { return execution.getState(); } else { - final IntermediateResult intermediateResult = + final IntermediateResult intermediateResult = executionGraph.getAllIntermediateResults().get(intermediateResultId); if (intermediateResult != null) { @@ -623,7 +503,15 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } @RpcMethod - public Acknowledge scheduleOrUpdateConsumers(ResultPartitionID partitionID) throws ExecutionGraphException { + public Acknowledge scheduleOrUpdateConsumers( + final UUID leaderSessionID, + final ResultPartitionID partitionID) throws Exception + { + if (!this.leaderSessionID.equals(leaderSessionID)) { + throw new Exception("Leader id not match, expected: " + this.leaderSessionID + + ", actual: " + leaderSessionID); + } + executionGraph.scheduleOrUpdateConsumers(partitionID); return Acknowledge.get(); } @@ -638,171 +526,118 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { final JobID jobID, final ExecutionAttemptID executionAttemptID, final CheckpointMetaData checkpointInfo, - final SubtaskState checkpointStateHandles) { + final SubtaskState checkpointState) throws CheckpointException { - throw new UnsupportedOperationException(); + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + final AcknowledgeCheckpoint ackMessage = + new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointInfo, checkpointState); + + if (checkpointCoordinator != null) { + getRpcService().execute(new Runnable() { + @Override + public void run() { + try { + checkpointCoordinator.receiveAcknowledgeMessage(ackMessage); + } catch (Throwable t) { + log.warn("Error while processing checkpoint acknowledgement message"); + } + } + }); + } else { + log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", + jobGraph.getJobID()); + } } @RpcMethod public void declineCheckpoint( final JobID jobID, final ExecutionAttemptID executionAttemptID, - final long checkpointId, - final Throwable cause) { - - throw new UnsupportedOperationException(); - } - - //---------------------------------------------------------------------------------------------- - // Internal methods - //---------------------------------------------------------------------------------------------- - - @RpcMethod - public void resourceRemoved(final ResourceID resourceId, final String message) { - // TODO: remove resource from slot pool - } + final long checkpointID, + final Throwable reason) + { + final DeclineCheckpoint decline = new DeclineCheckpoint( + jobID, executionAttemptID, checkpointID, reason); + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); - @RpcMethod - public void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge) { - if (executionGraph != null) { - final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); - if (checkpointCoordinator != null) { - getRpcService().execute(new Runnable() { - @Override - public void run() { - try { - if (!checkpointCoordinator.receiveAcknowledgeMessage(acknowledge)) { - log.info("Received message for non-existing checkpoint {}.", - acknowledge.getCheckpointId()); - } - } catch (Exception e) { - log.error("Error in CheckpointCoordinator while processing {}", acknowledge, e); - } + if (checkpointCoordinator != null) { + getRpcService().execute(new Runnable() { + @Override + public void run() { + try { + checkpointCoordinator.receiveDeclineMessage(decline); + } catch (Exception e) { + log.error("Error in CheckpointCoordinator while processing {}", decline, e); } - }); - } - else { - log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); - } + } + }); } else { - log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID()); - } - } - - @RpcMethod - public void declineCheckpoint(final DeclineCheckpoint decline) { - if (executionGraph != null) { - final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); - if (checkpointCoordinator != null) { - getRpcService().execute(new Runnable() { - @Override - public void run() { - try { - log.info("Received message for non-existing checkpoint {}.", decline.getCheckpointId()); - } catch (Exception e) { - log.error("Error in CheckpointCoordinator while processing {}", decline, e); - } - } - }); - } else { - log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", + log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", jobGraph.getJobID()); - } - } else { - log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID()); } } @RpcMethod public KvStateLocation lookupKvStateLocation(final String registrationName) throws Exception { - if (executionGraph != null) { - if (log.isDebugEnabled()) { - log.debug("Lookup key-value state for job {} with registration " + + if (log.isDebugEnabled()) { + log.debug("Lookup key-value state for job {} with registration " + "name {}.", jobGraph.getJobID(), registrationName); - } + } - final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry(); - final KvStateLocation location = registry.getKvStateLocation(registrationName); - if (location != null) { - return location; - } else { - throw new UnknownKvStateLocation(registrationName); - } + final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry(); + final KvStateLocation location = registry.getKvStateLocation(registrationName); + if (location != null) { + return location; } else { - throw new IllegalStateException("Received lookup KvState location request for unavailable job " + - jobGraph.getJobID()); + throw new UnknownKvStateLocation(registrationName); } } @RpcMethod public void notifyKvStateRegistered( - final JobVertexID jobVertexId, - final KeyGroupRange keyGroupRange, - final String registrationName, - final KvStateID kvStateId, - final KvStateServerAddress kvStateServerAddress) + final JobVertexID jobVertexId, + final KeyGroupRange keyGroupRange, + final String registrationName, + final KvStateID kvStateId, + final KvStateServerAddress kvStateServerAddress) { - if (executionGraph != null) { - if (log.isDebugEnabled()) { - log.debug("Key value state registered for job {} under name {}.", + if (log.isDebugEnabled()) { + log.debug("Key value state registered for job {} under name {}.", jobGraph.getJobID(), registrationName); - } - try { - executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered( - jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress - ); - } catch (Exception e) { - log.error("Failed to notify KvStateRegistry about registration {}.", registrationName); - } - } else { - log.error("Received notify KvState registered request for unavailable job " + jobGraph.getJobID()); + } + + try { + executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered( + jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress); + } catch (Exception e) { + log.error("Failed to notify KvStateRegistry about registration {}.", registrationName); } } @RpcMethod public void notifyKvStateUnregistered( - JobVertexID jobVertexId, - KeyGroupRange keyGroupRange, - String registrationName) + JobVertexID jobVertexId, + KeyGroupRange keyGroupRange, + String registrationName) { - if (executionGraph != null) { - if (log.isDebugEnabled()) { - log.debug("Key value state unregistered for job {} under name {}.", + if (log.isDebugEnabled()) { + log.debug("Key value state unregistered for job {} under name {}.", jobGraph.getJobID(), registrationName); - } - try { - executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered( - jobVertexId, keyGroupRange, registrationName - ); - } catch (Exception e) { - log.error("Failed to notify KvStateRegistry about registration {}.", registrationName); - } - } else { - log.error("Received notify KvState unregistered request for unavailable job " + jobGraph.getJobID()); } - } - @RpcMethod - public Future<TriggerSavepointResponse> triggerSavepoint() throws Exception { - return null; - } - - @RpcMethod - public DisposeSavepointResponse disposeSavepoint(final String savepointPath) { - // TODO - return null; + try { + executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered( + jobVertexId, keyGroupRange, registrationName); + } catch (Exception e) { + log.error("Failed to notify KvStateRegistry about registration {}.", registrationName); + } } @RpcMethod public ClassloadingProps requestClassloadingProps() throws Exception { - if (executionGraph != null) { - return new ClassloadingProps(libraryCacheManager.getBlobServerPort(), + return new ClassloadingProps(libraryCacheManager.getBlobServerPort(), executionGraph.getRequiredJarFiles(), executionGraph.getRequiredClasspaths()); - } else { - throw new Exception("Received classloading props request for unavailable job " + jobGraph.getJobID()); - } } //---------------------------------------------------------------------------------------------- @@ -815,12 +650,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { public void run() { log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause); shutDown(); - jobCompletionActions.onFatalError(cause); + errorHandler.onFatalError(cause); } }); } - // TODO - wrap this as StatusListenerMessenger's callback with rpc main thread private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { final JobID jobID = executionGraph.getJobID(); final String jobName = executionGraph.getJobName(); @@ -848,36 +682,33 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { if (newJobStatus == JobStatus.FINISHED) { try { final Map<String, SerializedValue<Object>> accumulatorResults = - executionGraph.getAccumulatorsSerialized(); + executionGraph.getAccumulatorsSerialized(); final SerializedJobExecutionResult result = new SerializedJobExecutionResult( - jobID, 0, accumulatorResults // TODO get correct job duration + jobID, 0, accumulatorResults // TODO get correct job duration ); jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader)); } catch (Exception e) { log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e); final JobExecutionException exception = new JobExecutionException( - jobID, "Failed to retrieve accumulator results.", e); + jobID, "Failed to retrieve accumulator results.", e); // TODO should we also notify client? jobCompletionActions.jobFailed(exception); } - } - else if (newJobStatus == JobStatus.CANCELED) { + } else if (newJobStatus == JobStatus.CANCELED) { final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader); final JobExecutionException exception = new JobExecutionException( - jobID, "Job was cancelled.", unpackedError); + jobID, "Job was cancelled.", unpackedError); // TODO should we also notify client? jobCompletionActions.jobFailed(exception); - } - else if (newJobStatus == JobStatus.FAILED) { + } else if (newJobStatus == JobStatus.FAILED) { final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader); final JobExecutionException exception = new JobExecutionException( - jobID, "Job execution failed.", unpackedError); + jobID, "Job execution failed.", unpackedError); // TODO should we also notify client? jobCompletionActions.jobFailed(exception); - } - else { + } else { final JobExecutionException exception = new JobExecutionException( - jobID, newJobStatus + " is not a terminal state."); + jobID, newJobStatus + " is not a terminal state."); // TODO should we also notify client? jobCompletionActions.jobFailed(exception); throw new RuntimeException(exception); @@ -886,7 +717,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } private void notifyOfNewResourceManagerLeader( - final String resourceManagerAddress, final UUID resourceManagerLeaderId) + final String resourceManagerAddress, final UUID resourceManagerLeaderId) { // IMPORTANT: executed by main thread to avoid concurrence runAsync(new Runnable() { @@ -895,17 +726,15 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { if (resourceManagerConnection != null) { if (resourceManagerAddress != null) { if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress()) - && resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) - { + && resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) { // both address and leader id are not changed, we can keep the old connection return; } log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", - resourceManagerConnection.getTargetAddress(), resourceManagerAddress); - } - else { + resourceManagerConnection.getTargetAddress(), resourceManagerAddress); + } else { log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", - resourceManagerConnection.getTargetAddress()); + resourceManagerConnection.getTargetAddress()); } } @@ -914,8 +743,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { if (resourceManagerAddress != null) { log.info("Attempting to register at ResourceManager {}", resourceManagerAddress); resourceManagerConnection = new ResourceManagerConnection( - log, jobGraph.getJobID(), leaderSessionID, - resourceManagerAddress, resourceManagerLeaderId, executionContext); + log, jobGraph.getJobID(), leaderSessionID, + resourceManagerAddress, resourceManagerLeaderId, executionContext); resourceManagerConnection.start(); } } @@ -929,26 +758,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { // TODO - add tests for comment in https://github.com/apache/flink/pull/2565 // verify the response with current connection if (resourceManagerConnection != null - && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) { + && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) { log.info("JobManager successfully registered at ResourceManager, leader id: {}.", - success.getResourceManagerLeaderId()); + success.getResourceManagerLeaderId()); } } }); } - private void disposeCommunicationWithResourceManager() { - // 1. stop the leader retriever so we will not receiving updates anymore - try { - resourceManagerLeaderRetriever.stop(); - } catch (Exception e) { - log.warn("Failed to stop resource manager leader retriever."); - } - - // 2. close current connection with ResourceManager if exists - closeResourceManagerConnection(); - } - private void closeResourceManagerConnection() { if (resourceManagerConnection != null) { resourceManagerConnection.close(); @@ -957,34 +774,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } //---------------------------------------------------------------------------------------------- - // Helper methods - //---------------------------------------------------------------------------------------------- - - /** - * Converts JobVertexIDs to corresponding ExecutionJobVertexes - * - * @param executionGraph The execution graph that holds the relationship - * @param vertexIDs The vertexIDs need to be converted - * @return The corresponding ExecutionJobVertexes - * @throws JobExecutionException - */ - private static List<ExecutionJobVertex> getExecutionJobVertexWithId( - final ExecutionGraph executionGraph, final List<JobVertexID> vertexIDs) - throws JobExecutionException - { - final List<ExecutionJobVertex> ret = new ArrayList<>(vertexIDs.size()); - for (JobVertexID vertexID : vertexIDs) { - final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(vertexID); - if (executionJobVertex == null) { - throw new JobExecutionException(executionGraph.getJobID(), - "The snapshot checkpointing settings refer to non-existent vertex " + vertexID); - } - ret.add(executionJobVertex); - } - return ret; - } - - //---------------------------------------------------------------------------------------------- // Utility classes //---------------------------------------------------------------------------------------------- @@ -1001,19 +790,19 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } private class ResourceManagerConnection - extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess> + extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess> { private final JobID jobID; private final UUID jobManagerLeaderID; ResourceManagerConnection( - final Logger log, - final JobID jobID, - final UUID jobManagerLeaderID, - final String resourceManagerAddress, - final UUID resourceManagerLeaderID, - final Executor executor) + final Logger log, + final JobID jobID, + final UUID jobManagerLeaderID, + final String resourceManagerAddress, + final UUID resourceManagerLeaderID, + final Executor executor) { super(log, resourceManagerAddress, resourceManagerLeaderID, executor); this.jobID = checkNotNull(jobID); @@ -1023,12 +812,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { @Override protected RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() { return new RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess>( - log, getRpcService(), "ResourceManager", ResourceManagerGateway.class, - getTargetAddress(), getTargetLeaderId()) + log, getRpcService(), "ResourceManager", ResourceManagerGateway.class, + getTargetAddress(), getTargetLeaderId()) { @Override protected Future<RegistrationResponse> invokeRegistration(ResourceManagerGateway gateway, UUID leaderId, - long timeoutMillis) throws Exception + long timeoutMillis) throws Exception { Time timeout = Time.milliseconds(timeoutMillis); return gateway.registerJobMaster(leaderId, jobManagerLeaderID, getAddress(), jobID, timeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 5223b3e..daa33a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.jobmaster; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -32,11 +31,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; -import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse; -import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; -import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateServerAddress; @@ -52,52 +47,54 @@ import java.util.UUID; */ public interface JobMasterGateway extends CheckpointCoordinatorGateway { - /** - * Starting the job under the given leader session ID. - */ - void startJob(final UUID leaderSessionID); + // ------------------------------------------------------------------------ + // Job start and stop methods + // ------------------------------------------------------------------------ - /** - * Suspending job, all the running tasks will be cancelled, and runtime status will be cleared. - * Should re-submit the job before restarting it. - * - * @param cause The reason of why this job been suspended. - */ - void suspendJob(final Throwable cause); + void startJobExecution(); + + void suspendExecution(Throwable cause); + + // ------------------------------------------------------------------------ /** * Updates the task execution state for a given task. * + * @param leaderSessionID The leader id of JobManager * @param taskExecutionState New task execution state for a given task * @return Future flag of the task execution state update result */ - Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState); + Future<Acknowledge> updateTaskExecutionState( + final UUID leaderSessionID, + final TaskExecutionState taskExecutionState); /** * Requesting next input split for the {@link ExecutionJobVertex}. The next input split is sent back to the sender * as a {@link SerializedInputSplit} message. * + * @param leaderSessionID The leader id of JobManager * @param vertexID The job vertex id * @param executionAttempt The execution attempt id * @return The future of the input split. If there is no further input split, will return an empty object. */ Future<SerializedInputSplit> requestNextInputSplit( - final JobVertexID vertexID, - final ExecutionAttemptID executionAttempt); + final UUID leaderSessionID, + final JobVertexID vertexID, + final ExecutionAttemptID executionAttempt); /** - * Requests the current state of the producer of an intermediate result partition. + * Requests the current state of the partition. * The state of a partition is currently bound to the state of the producing execution. * - * @param jobId TheID of job that the intermediate result partition belongs to. + * @param leaderSessionID The leader id of JobManager * @param intermediateResultId The execution attempt ID of the task requesting the partition state. * @param partitionId The partition ID of the partition to request the state of. * @return The future of the partition state */ Future<ExecutionState> requestPartitionState( - JobID jobId, - IntermediateDataSetID intermediateResultId, - ResultPartitionID partitionId); + final UUID leaderSessionID, + final IntermediateDataSetID intermediateResultId, + final ResultPartitionID partitionId); /** * Notifies the JobManager about available data for a produced partition. @@ -108,11 +105,15 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { * <p> * The JobManager then can decide when to schedule the partition consumers of the given session. * - * @param partitionID The partition which has already produced data - * @param timeout before the rpc call fails + * @param leaderSessionID The leader id of JobManager + * @param partitionID The partition which has already produced data + * @param timeout before the rpc call fails * @return Future acknowledge of the schedule or update operation */ - Future<Acknowledge> scheduleOrUpdateConsumers(final ResultPartitionID partitionID, @RpcTimeout final Time timeout); + Future<Acknowledge> scheduleOrUpdateConsumers( + final UUID leaderSessionID, + final ResultPartitionID partitionID, + @RpcTimeout final Time timeout); /** * Disconnects the given {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} from the @@ -123,36 +124,12 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { void disconnectTaskManager(ResourceID resourceID); /** - * Notifies the JobManager about the removal of a resource. - * - * @param resourceId The ID under which the resource is registered. - * @param message Optional message with details, for logging and debugging. - */ - - void resourceRemoved(final ResourceID resourceId, final String message); - - /** - * Notifies the JobManager that the checkpoint of an individual task is completed. - * - * @param acknowledge The acknowledge message of the checkpoint - */ - void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge); - - /** - * Notifies the JobManager that a checkpoint request could not be heeded. - * This can happen if a Task is already in RUNNING state but is internally not yet ready to perform checkpoints. - * - * @param decline The decline message of the checkpoint - */ - void declineCheckpoint(final DeclineCheckpoint decline); - - /** * Requests a {@link KvStateLocation} for the specified {@link KvState} registration name. * * @param registrationName Name under which the KvState has been registered. * @return Future of the requested {@link KvState} location */ - Future<KvStateLocation> lookupKvStateLocation(final String registrationName) throws Exception; + Future<KvStateLocation> lookupKvStateLocation(final String registrationName); /** * @param jobVertexId JobVertexID the KvState instance belongs to. @@ -162,11 +139,11 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { * @param kvStateServerAddress Server address where to find the KvState instance. */ void notifyKvStateRegistered( - final JobVertexID jobVertexId, - final KeyGroupRange keyGroupRange, - final String registrationName, - final KvStateID kvStateId, - final KvStateServerAddress kvStateServerAddress); + final JobVertexID jobVertexId, + final KeyGroupRange keyGroupRange, + final String registrationName, + final KvStateID kvStateId, + final KvStateServerAddress kvStateServerAddress); /** * @param jobVertexId JobVertexID the KvState instance belongs to. @@ -174,24 +151,9 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { * @param registrationName Name under which the KvState has been registered. */ void notifyKvStateUnregistered( - JobVertexID jobVertexId, - KeyGroupRange keyGroupRange, - String registrationName); - - /** - * Notifies the JobManager to trigger a savepoint for this job. - * - * @return Future of the savepoint trigger response. - */ - Future<TriggerSavepointResponse> triggerSavepoint(); - - /** - * Notifies the Jobmanager to dispose specified savepoint. - * - * @param savepointPath The path of the savepoint. - * @return The future of the savepoint disponse response. - */ - Future<DisposeSavepointResponse> disposeSavepoint(final String savepointPath); + JobVertexID jobVertexId, + KeyGroupRange keyGroupRange, + String registrationName); /** * Request the classloading props of this job. http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java index e8fb5bb..019ccfe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java @@ -25,6 +25,8 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.slf4j.Logger; @@ -62,6 +64,9 @@ public class MiniClusterJobDispatcher { /** al the services that the JobManager needs, such as BLOB service, factories, etc */ private final JobManagerServices jobManagerServices; + /** Registry for all metrics in the mini cluster */ + private final MetricRegistry metricRegistry; + /** The number of JobManagers to launch (more than one simulates a high-availability setup) */ private final int numJobManagers; @@ -86,8 +91,9 @@ public class MiniClusterJobDispatcher { public MiniClusterJobDispatcher( Configuration config, RpcService rpcService, - HighAvailabilityServices haServices) throws Exception { - this(config, rpcService, haServices, 1); + HighAvailabilityServices haServices, + MetricRegistry metricRegistry) throws Exception { + this(config, rpcService, haServices, metricRegistry, 1); } /** @@ -106,16 +112,18 @@ public class MiniClusterJobDispatcher { Configuration config, RpcService rpcService, HighAvailabilityServices haServices, + MetricRegistry metricRegistry, int numJobManagers) throws Exception { checkArgument(numJobManagers >= 1); this.configuration = checkNotNull(config); this.rpcService = checkNotNull(rpcService); this.haServices = checkNotNull(haServices); + this.metricRegistry = checkNotNull(metricRegistry); this.numJobManagers = numJobManagers; LOG.info("Creating JobMaster services"); - this.jobManagerServices = JobManagerServices.fromConfiguration(config); + this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices); } // ------------------------------------------------------------------------ @@ -140,9 +148,8 @@ public class MiniClusterJobDispatcher { if (runners != null) { this.runners = null; - Exception shutdownException = new Exception("The MiniCluster is shutting down"); for (JobManagerRunner runner : runners) { - runner.shutdown(shutdownException); + runner.shutdown(); } } } @@ -171,9 +178,9 @@ public class MiniClusterJobDispatcher { checkState(!shutdown, "mini cluster is shut down"); checkState(runners == null, "mini cluster can only execute one job at a time"); - OnCompletionActions onJobCompletion = new DetachedFinalizer(numJobManagers); + DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers); - this.runners = startJobRunners(job, onJobCompletion); + this.runners = startJobRunners(job, finalizer, finalizer); } } @@ -191,17 +198,17 @@ public class MiniClusterJobDispatcher { checkNotNull(job); LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID()); - final BlockingJobSync onJobCompletion = new BlockingJobSync(job.getJobID(), numJobManagers); + final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers); synchronized (lock) { checkState(!shutdown, "mini cluster is shut down"); checkState(runners == null, "mini cluster can only execute one job at a time"); - this.runners = startJobRunners(job, onJobCompletion); + this.runners = startJobRunners(job, sync, sync); } try { - return onJobCompletion.getResult(); + return sync.getResult(); } finally { // always clear the status for the next job @@ -209,24 +216,26 @@ public class MiniClusterJobDispatcher { } } - private JobManagerRunner[] startJobRunners(JobGraph job, OnCompletionActions onCompletion) throws JobExecutionException { + private JobManagerRunner[] startJobRunners( + JobGraph job, + OnCompletionActions onCompletion, + FatalErrorHandler errorHandler) throws JobExecutionException { LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID()); JobManagerRunner[] runners = new JobManagerRunner[numJobManagers]; for (int i = 0; i < numJobManagers; i++) { try { runners[i] = new JobManagerRunner(job, configuration, - rpcService, haServices, jobManagerServices, onCompletion); + rpcService, haServices, jobManagerServices, metricRegistry, + onCompletion, errorHandler); runners[i].start(); } catch (Throwable t) { // shut down all the ones so far - Exception shutdownCause = new Exception("Failed to properly start all mini cluster JobManagers", t); - for (int k = 0; k <= i; k++) { try { if (runners[i] != null) { - runners[i].shutdown(shutdownCause); + runners[i].shutdown(); } } catch (Throwable ignored) { // silent shutdown @@ -244,15 +253,15 @@ public class MiniClusterJobDispatcher { // test methods to simulate job master failures // ------------------------------------------------------------------------ - public void killJobMaster(int which) { - checkArgument(which >= 0 && which < numJobManagers, "no such job master"); - checkState(!shutdown, "mini cluster is shut down"); - - JobManagerRunner[] runners = this.runners; - checkState(runners != null, "mini cluster it not executing a job right now"); - - runners[which].shutdown(new Throwable("kill JobManager")); - } +// public void killJobMaster(int which) { +// checkArgument(which >= 0 && which < numJobManagers, "no such job master"); +// checkState(!shutdown, "mini cluster is shut down"); +// +// JobManagerRunner[] runners = this.runners; +// checkState(runners != null, "mini cluster it not executing a job right now"); +// +// runners[which].shutdown(new Throwable("kill JobManager")); +// } // ------------------------------------------------------------------------ // utility classes @@ -263,7 +272,7 @@ public class MiniClusterJobDispatcher { * In the case of a high-availability test setup, there may be multiple runners. * After that, it marks the mini cluster as ready to receive new jobs. */ - private class DetachedFinalizer implements OnCompletionActions { + private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler { private final AtomicInteger numJobManagersToWaitFor; @@ -308,7 +317,7 @@ public class MiniClusterJobDispatcher { * That way it is guaranteed that after the blocking job submit call returns, * the dispatcher is immediately free to accept another job. */ - private static class BlockingJobSync implements OnCompletionActions { + private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler { private final JobID jobId; http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java index 520755d..572ba2f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java @@ -23,7 +23,6 @@ import org.apache.flink.runtime.blob.BlobKey; import java.io.Serializable; import java.net.URL; import java.util.Collection; -import java.util.List; /** * The response of classloading props request to JobManager. http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java deleted file mode 100644 index 42bfc71..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmaster.message; - -import java.io.Serializable; - -/** - * The response of the dispose savepoint request to JobManager. - */ -public abstract class DisposeSavepointResponse implements Serializable { - - private static final long serialVersionUID = 6008792963949369567L; - - public static class Success extends DisposeSavepointResponse implements Serializable { - - private static final long serialVersionUID = 1572462960008711415L; - } - - public static class Failure extends DisposeSavepointResponse implements Serializable { - - private static final long serialVersionUID = -7505308325483022458L; - - private final Throwable cause; - - public Failure(final Throwable cause) { - this.cause = cause; - } - - public Throwable getCause() { - return cause; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java deleted file mode 100644 index 0b0edc5..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmaster.message; - -import org.apache.flink.api.common.JobID; - -import java.io.Serializable; - -/** - * The response of the trigger savepoint request to JobManager. - */ -public abstract class TriggerSavepointResponse implements Serializable { - - private static final long serialVersionUID = 3139327824611807707L; - - private final JobID jobID; - - public JobID getJobID() { - return jobID; - } - - public TriggerSavepointResponse(final JobID jobID) { - this.jobID = jobID; - } - - public static class Success extends TriggerSavepointResponse implements Serializable { - - private static final long serialVersionUID = -1100637460388881776L; - - private final String savepointPath; - - public Success(final JobID jobID, final String savepointPath) { - super(jobID); - this.savepointPath = savepointPath; - } - - public String getSavepointPath() { - return savepointPath; - } - } - - public static class Failure extends TriggerSavepointResponse implements Serializable { - - private static final long serialVersionUID = -1668479003490615139L; - - private final Throwable cause; - - public Failure(final JobID jobID, final Throwable cause) { - super(jobID); - this.cause = cause; - } - - public Throwable getCause() { - return cause; - } - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index 2052f98..4b9100a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -33,8 +33,8 @@ import java.util.concurrent.TimeUnit; public interface RpcService { /** - * Return the address under which the rpc service can be reached. If the rpc service cannot be - * contacted remotely, then it will return an empty string. + * Return the hostname or host address under which the rpc service can be reached. + * If the rpc service cannot be contacted remotely, then it will return an empty string. * * @return Address of the rpc service or empty string if local rpc service */ http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java index 72668d2..1b311e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java @@ -26,11 +26,16 @@ import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.util.Preconditions; +import java.util.UUID; + /** * Container class for JobManager specific communication utils used by the {@link TaskExecutor}. */ public class JobManagerConnection { + // Job master leader session id + private final UUID jobMasterLeaderId; + // Gateway to the job master private final JobMasterGateway jobMasterGateway; @@ -50,13 +55,15 @@ public class JobManagerConnection { private final PartitionProducerStateChecker partitionStateChecker; public JobManagerConnection( - JobMasterGateway jobMasterGateway, - TaskManagerActions taskManagerActions, - CheckpointResponder checkpointResponder, - LibraryCacheManager libraryCacheManager, - ResultPartitionConsumableNotifier resultPartitionConsumableNotifier, - PartitionProducerStateChecker partitionStateChecker) { - + UUID jobMasterLeaderId, + JobMasterGateway jobMasterGateway, + TaskManagerActions taskManagerActions, + CheckpointResponder checkpointResponder, + LibraryCacheManager libraryCacheManager, + ResultPartitionConsumableNotifier resultPartitionConsumableNotifier, + PartitionProducerStateChecker partitionStateChecker) + { + this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId); this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); this.taskManagerActions = Preconditions.checkNotNull(taskManagerActions); this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder); @@ -65,6 +72,10 @@ public class JobManagerConnection { this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker); } + public UUID getJobMasterLeaderId() { + return jobMasterLeaderId; + } + public JobMasterGateway getJobManagerGateway() { return jobMasterGateway; } http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 36f108e..2389291 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -18,37 +18,46 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.executiongraph.JobInformation; -import org.apache.flink.runtime.executiongraph.TaskInformation; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered; +import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcMethod; +import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException; import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException; import org.apache.flink.runtime.taskexecutor.exceptions.TaskException; @@ -62,26 +71,16 @@ import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; -import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.rpc.RpcEndpoint; -import org.apache.flink.runtime.rpc.RpcMethod; -import org.apache.flink.runtime.rpc.RpcService; - import org.apache.flink.util.Preconditions; -import java.util.HashSet; -import java.util.Set; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.UUID; import static org.apache.flink.util.Preconditions.checkArgument; @@ -292,6 +291,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { tdd.getAttemptNumber()); InputSplitProvider inputSplitProvider = new RpcInputSplitProvider( + jobManagerConnection.getJobMasterLeaderId(), jobManagerConnection.getJobManagerGateway(), jobInformation.getJobId(), taskInformation.getJobVertexId(), @@ -605,10 +605,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { clearTasks(); } - private void updateTaskExecutionState(final JobMasterGateway jobMasterGateway, final TaskExecutionState taskExecutionState) { + private void updateTaskExecutionState( + final UUID jobMasterLeaderId, + final JobMasterGateway jobMasterGateway, + final TaskExecutionState taskExecutionState) + { final ExecutionAttemptID executionAttemptID = taskExecutionState.getID(); - Future<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(taskExecutionState); + Future<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState( + jobMasterLeaderId, taskExecutionState); futureAcknowledge.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { @Override @@ -620,7 +625,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { }, getMainThreadExecutor()); } - private void unregisterTaskAndNotifyFinalState(final JobMasterGateway jobMasterGateway, ExecutionAttemptID executionAttemptID) { + private void unregisterTaskAndNotifyFinalState( + final UUID jobMasterLeaderId, + final JobMasterGateway jobMasterGateway, + final ExecutionAttemptID executionAttemptID) + { Task task = removeTask(executionAttemptID); if (task != null) { @@ -638,14 +647,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { AccumulatorSnapshot accumulatorSnapshot = task.getAccumulatorRegistry().getSnapshot(); updateTaskExecutionState( - jobMasterGateway, - new TaskExecutionState( - task.getJobID(), - task.getExecutionId(), - task.getExecutionState(), - task.getFailureCause(), - accumulatorSnapshot, - task.getMetricGroup().getIOMetricGroup().createSnapshot())); + jobMasterLeaderId, + jobMasterGateway, + new TaskExecutionState( + task.getJobID(), + task.getExecutionId(), + task.getExecutionState(), + task.getFailureCause(), + accumulatorSnapshot, + task.getMetricGroup().getIOMetricGroup().createSnapshot())); } else { log.error("Cannot find task with ID {} to unregister.", executionAttemptID); } @@ -687,11 +697,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } } - private JobManagerConnection associateWithJobManager(JobMasterGateway jobMasterGateway, int blobPort) { + private JobManagerConnection associateWithJobManager(UUID jobMasterLeaderId, + JobMasterGateway jobMasterGateway, int blobPort) + { + Preconditions.checkNotNull(jobMasterLeaderId); Preconditions.checkNotNull(jobMasterGateway); Preconditions.checkArgument(blobPort > 0 || blobPort <= 65535, "Blob port is out of range."); - TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterGateway); + TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterLeaderId, jobMasterGateway); CheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway); @@ -704,19 +717,21 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { taskManagerConfiguration.getCleanupInterval()); ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier( - jobMasterGateway, - getRpcService().getExecutor(), - taskManagerConfiguration.getTimeout()); + jobMasterLeaderId, + jobMasterGateway, + getRpcService().getExecutor(), + taskManagerConfiguration.getTimeout()); - PartitionProducerStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobMasterGateway); + PartitionProducerStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobMasterLeaderId, jobMasterGateway); return new JobManagerConnection( - jobMasterGateway, - taskManagerActions, - checkpointResponder, - libraryCacheManager, - resultPartitionConsumableNotifier, - partitionStateChecker); + jobMasterLeaderId, + jobMasterGateway, + taskManagerActions, + checkpointResponder, + libraryCacheManager, + resultPartitionConsumableNotifier, + partitionStateChecker); } private void disassociateFromJobManager(JobManagerConnection jobManagerConnection) throws IOException { @@ -808,9 +823,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } private class TaskManagerActionsImpl implements TaskManagerActions { + private final UUID jobMasterLeaderId; private final JobMasterGateway jobMasterGateway; - private TaskManagerActionsImpl(JobMasterGateway jobMasterGateway) { + private TaskManagerActionsImpl(UUID jobMasterLeaderId, JobMasterGateway jobMasterGateway) { + this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId); this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); } @@ -819,7 +836,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { runAsync(new Runnable() { @Override public void run() { - unregisterTaskAndNotifyFinalState(jobMasterGateway, executionAttemptID); + unregisterTaskAndNotifyFinalState(jobMasterLeaderId, jobMasterGateway, executionAttemptID); } }); } @@ -842,7 +859,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { @Override public void updateTaskExecutionState(final TaskExecutionState taskExecutionState) { - TaskExecutor.this.updateTaskExecutionState(jobMasterGateway, taskExecutionState); + TaskExecutor.this.updateTaskExecutionState(jobMasterLeaderId, jobMasterGateway, taskExecutionState); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java index 4850d63..3b9da48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java @@ -31,7 +31,10 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; +import java.util.UUID; + public class RpcInputSplitProvider implements InputSplitProvider { + private final UUID jobMasterLeaderId; private final JobMasterGateway jobMasterGateway; private final JobID jobID; private final JobVertexID jobVertexID; @@ -39,11 +42,13 @@ public class RpcInputSplitProvider implements InputSplitProvider { private final Time timeout; public RpcInputSplitProvider( + UUID jobMasterLeaderId, JobMasterGateway jobMasterGateway, JobID jobID, JobVertexID jobVertexID, ExecutionAttemptID executionAttemptID, Time timeout) { + this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId); this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); this.jobID = Preconditions.checkNotNull(jobID); this.jobVertexID = Preconditions.checkNotNull(jobVertexID); @@ -56,7 +61,8 @@ public class RpcInputSplitProvider implements InputSplitProvider { public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException { Preconditions.checkNotNull(userCodeClassLoader); - Future<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(jobVertexID, executionAttemptID); + Future<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit( + jobMasterLeaderId, jobVertexID, executionAttemptID); try { SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit());
