[FLINK-4986] Improvements to the JobMaster
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8730e200 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8730e200 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8730e200 Branch: refs/heads/master Commit: 8730e200864427dd5d6ddb9f841978d68ab452bd Parents: 91f1d09 Author: Stephan Ewen <[email protected]> Authored: Mon Oct 17 20:26:58 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Dec 23 20:54:25 2016 +0100 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionGraph.java | 15 + .../flink/runtime/jobmaster/JobMaster.java | 336 ++++++++++--------- 2 files changed, 184 insertions(+), 167 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8730e200/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index cbb4c7e..2025fc2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -621,6 +621,21 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } /** + * Gets the accumulator results. + */ + public Map<String, Object> getAccumulators() throws IOException { + + Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators(); + + Map<String, Object> result = new HashMap<>(); + for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) { + result.put(entry.getKey(), entry.getValue().getLocalValue()); + } + + return result; + } + + /** * Gets a serialized accumulator map. * @return The accumulator map with serialized accumulator values. * @throws IOException http://git-wip-us.apache.org/repos/asf/flink/blob/8730e200/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 458bf0c..0b3b68e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.jobmaster; +import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; @@ -33,7 +34,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.client.SerializedJobExecutionResult; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.BiFunction; @@ -51,6 +51,7 @@ import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.LeaderIdMismatchException; import org.apache.flink.runtime.instance.Slot; import org.apache.flink.runtime.instance.SlotDescriptor; import org.apache.flink.runtime.instance.SlotPool; @@ -90,7 +91,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import javax.annotation.Nullable; @@ -246,6 +246,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { -1, log); + // register self as job status change listener + executionGraph.registerJobStatusListener(new JobManagerJobStatusListener()); + this.slotPool = new SlotPool(executorService); this.allocationTimeout = Time.of(5, TimeUnit.SECONDS); @@ -269,13 +272,18 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { */ public void start(final UUID leaderSessionID) throws Exception { if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) { - super.start(); + // make sure the slot pool now accepts messages for this leader slotPool.setJobManagerLeaderId(leaderSessionID); - log.info("Starting JobManager for job {} ({})", jobGraph.getName(), jobGraph.getJobID()); + + // make sure we receive RPC and async calls + super.start(); + + log.info("JobManager started as leader {} for job {}", leaderSessionID, jobGraph.getJobID()); getSelf().startJobExecution(); - } else { - log.warn("Job already started with leaderId {}, ignoring this start request.", leaderSessionID); + } + else { + log.warn("Job already started with leader ID {}, ignoring this start request.", leaderSessionID); } } @@ -297,48 +305,21 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { @RpcMethod public void startJobExecution() { - log.info("Starting execution of job {} ({}) with leaderId {}.", - jobGraph.getName(), jobGraph.getJobID(), leaderSessionID); + log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID()); try { - // register self as job status change listener - executionGraph.registerJobStatusListener(new JobStatusListener() { - @Override - public void jobStatusChanges( - final JobID jobId, final JobStatus newJobStatus, final long timestamp, final Throwable error) - { - // run in rpc thread to avoid concurrency - runAsync(new Runnable() { - @Override - public void run() { - jobStatusChanged(newJobStatus, timestamp, error); - } - }); - } - }); - // job is ready to go, try to establish connection with resource manager + // - activate leader retrieval for the resource manager + // - on notification of the leader, the connection will be established and + // the slot pool will start requesting slots resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); - } catch (Throwable t) { - - // TODO - this should not result in a job failure, but another leader should take over - // TODO - either this master should retry the execution, or it should relinquish leadership / terminate - + } + catch (Throwable t) { log.error("Failed to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t); - executionGraph.fail(t); - - final JobExecutionException rt; - if (t instanceof JobExecutionException) { - rt = (JobExecutionException) t; - } else { - rt = new JobExecutionException(jobGraph.getJobID(), - "Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t); - } - - // TODO: notify client about this failure + handleFatalError(new Exception( + "Could not start job execution: Failed to start leader service for Resource Manager", t)); - jobCompletionActions.jobFailed(rt); return; } @@ -348,7 +329,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { public void run() { try { executionGraph.scheduleForExecution(new PooledSlotProvider(slotPool, allocationTimeout)); - } catch (Throwable t) { + } + catch (Throwable t) { executionGraph.fail(t); } } @@ -386,6 +368,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } closeResourceManagerConnection(); + // TODO: in the future, the slot pool should not release the resources, so that + // TODO: the TaskManagers offer the resources to the new leader for (ResourceID taskManagerId : registeredTaskManagers.keySet()) { slotPool.releaseResource(taskManagerId); } @@ -405,14 +389,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { final UUID leaderSessionID, final TaskExecutionState taskExecutionState) throws Exception { - if (taskExecutionState == null) { - throw new NullPointerException("TaskExecutionState must not be null."); - } - - if (!this.leaderSessionID.equals(leaderSessionID)) { - throw new Exception("Leader id not match, expected: " + this.leaderSessionID - + ", actual: " + leaderSessionID); - } + checkNotNull(taskExecutionState, "taskExecutionState"); + validateLeaderSessionId(leaderSessionID); if (executionGraph.updateState(taskExecutionState)) { return Acknowledge.get(); @@ -428,10 +406,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) throws Exception { - if (!this.leaderSessionID.equals(leaderSessionID)) { - throw new Exception("Leader id not match, expected: " + this.leaderSessionID - + ", actual: " + leaderSessionID); - } + validateLeaderSessionId(leaderSessionID); final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); if (execution == null) { @@ -477,16 +452,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } } - @RpcMethod public ExecutionState requestPartitionState( final UUID leaderSessionID, final IntermediateDataSetID intermediateResultId, final ResultPartitionID resultPartitionId) throws Exception { - if (!this.leaderSessionID.equals(leaderSessionID)) { - throw new Exception("Leader id not match, expected: " + this.leaderSessionID - + ", actual: " + leaderSessionID); - } + validateLeaderSessionId(leaderSessionID); final Execution execution = executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId()); if (execution != null) { @@ -520,10 +491,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { final UUID leaderSessionID, final ResultPartitionID partitionID) throws Exception { - if (!this.leaderSessionID.equals(leaderSessionID)) { - throw new Exception("Leader id not match, expected: " + this.leaderSessionID - + ", actual: " + leaderSessionID); - } + validateLeaderSessionId(leaderSessionID); executionGraph.scheduleOrUpdateConsumers(partitionID); return Acknowledge.get(); @@ -534,6 +502,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { throw new UnsupportedOperationException(); } + // TODO: This method needs a leader session ID @RpcMethod public void acknowledgeCheckpoint( final JobID jobID, @@ -562,6 +531,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } } + // TODO: This method needs a leader session ID @RpcMethod public void declineCheckpoint( final JobID jobID, @@ -657,10 +627,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { public Iterable<SlotOffer> offerSlots(final ResourceID taskManagerId, final Iterable<SlotOffer> slots, final UUID leaderId) throws Exception { - if (!this.leaderSessionID.equals(leaderId)) { - throw new Exception("Leader id not match, expected: " + this.leaderSessionID - + ", actual: " + leaderId); - } + validateLeaderSessionId(leaderSessionID); Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId); if (taskManager == null) { @@ -689,10 +656,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { final UUID leaderId, final Exception cause) throws Exception { - if (!this.leaderSessionID.equals(leaderId)) { - throw new Exception("Leader id not match, expected: " + this.leaderSessionID - + ", actual: " + leaderId); - } + validateLeaderSessionId(leaderSessionID); if (!registeredTaskManagers.containsKey(taskManagerId)) { throw new Exception("Unknown TaskManager " + taskManagerId); @@ -782,62 +746,55 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { + validateRunsInMainThread(); + final JobID jobID = executionGraph.getJobID(); final String jobName = executionGraph.getJobName(); + log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error); if (newJobStatus.isGloballyTerminalState()) { - // TODO set job end time in JobInfo - - /* - TODO - if (jobInfo.sessionAlive) { - jobInfo.setLastActive() - val lastActivity = jobInfo.lastActive - context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) { - // remove only if no activity occurred in the meantime - if (lastActivity == jobInfo.lastActive) { - self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) - } - }(context.dispatcher) - } else { - self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) - } - */ - - if (newJobStatus == JobStatus.FINISHED) { - try { - final Map<String, SerializedValue<Object>> accumulatorResults = - executionGraph.getAccumulatorsSerialized(); - final SerializedJobExecutionResult result = new SerializedJobExecutionResult( - jobID, 0, accumulatorResults // TODO get correct job duration - ); - jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader)); - } catch (Exception e) { - log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e); + switch (newJobStatus) { + case FINISHED: + try { + // TODO get correct job duration + // job done, let's get the accumulators + Map<String, Object> accumulatorResults = executionGraph.getAccumulators(); + JobExecutionResult result = new JobExecutionResult(jobID, 0L, accumulatorResults); + + jobCompletionActions.jobFinished(result); + } + catch (Exception e) { + log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e); + + final JobExecutionException exception = new JobExecutionException(jobID, + "Failed to retrieve accumulator results. " + + "The job is registered as 'FINISHED (successful), but this notification describes " + + "a failure, since the resulting accumulators could not be fetched.", e); + + jobCompletionActions.jobFailed(exception); + } + break; + + case CANCELED: { final JobExecutionException exception = new JobExecutionException( - jobID, "Failed to retrieve accumulator results.", e); - // TODO should we also notify client? + jobID, "Job was cancelled.", new Exception("The job was cancelled")); + jobCompletionActions.jobFailed(exception); + break; } - } else if (newJobStatus == JobStatus.CANCELED) { - final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader); - final JobExecutionException exception = new JobExecutionException( - jobID, "Job was cancelled.", unpackedError); - // TODO should we also notify client? - jobCompletionActions.jobFailed(exception); - } else if (newJobStatus == JobStatus.FAILED) { - final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader); - final JobExecutionException exception = new JobExecutionException( - jobID, "Job execution failed.", unpackedError); - // TODO should we also notify client? - jobCompletionActions.jobFailed(exception); - } else { - final JobExecutionException exception = new JobExecutionException( - jobID, newJobStatus + " is not a terminal state."); - // TODO should we also notify client? - jobCompletionActions.jobFailed(exception); - throw new RuntimeException(exception); + + case FAILED: { + final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader); + final JobExecutionException exception = new JobExecutionException( + jobID, "Job execution failed.", unpackedError); + jobCompletionActions.jobFailed(exception); + break; + } + + default: + // this can happen only if the enum is buggy + throw new IllegalStateException(newJobStatus.toString()); } } } @@ -845,57 +802,52 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { private void notifyOfNewResourceManagerLeader( final String resourceManagerAddress, final UUID resourceManagerLeaderId) { - // IMPORTANT: executed by main thread to avoid concurrence - runAsync(new Runnable() { - @Override - public void run() { - if (resourceManagerConnection != null) { - if (resourceManagerAddress != null) { - if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress()) - && resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) { - // both address and leader id are not changed, we can keep the old connection - return; - } - log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", - resourceManagerConnection.getTargetAddress(), resourceManagerAddress); - } else { - log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", - resourceManagerConnection.getTargetAddress()); - } - } + validateRunsInMainThread(); - closeResourceManagerConnection(); - - if (resourceManagerAddress != null) { - log.info("Attempting to register at ResourceManager {}", resourceManagerAddress); - resourceManagerConnection = new ResourceManagerConnection( - log, jobGraph.getJobID(), leaderSessionID, - resourceManagerAddress, resourceManagerLeaderId, executionContext); - resourceManagerConnection.start(); + if (resourceManagerConnection != null) { + if (resourceManagerAddress != null) { + if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress()) + && resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) { + // both address and leader id are not changed, we can keep the old connection + return; } + log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", + resourceManagerConnection.getTargetAddress(), resourceManagerAddress); + } else { + log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", + resourceManagerConnection.getTargetAddress()); } - }); + } + + closeResourceManagerConnection(); + + if (resourceManagerAddress != null) { + log.info("Attempting to register at ResourceManager {}", resourceManagerAddress); + resourceManagerConnection = new ResourceManagerConnection( + log, jobGraph.getJobID(), getAddress(), leaderSessionID, + resourceManagerAddress, resourceManagerLeaderId, executionContext); + resourceManagerConnection.start(); + } } private void onResourceManagerRegistrationSuccess(final JobMasterRegistrationSuccess success) { - getRpcService().execute(new Runnable() { - @Override - public void run() { - // TODO - add tests for comment in https://github.com/apache/flink/pull/2565 - // verify the response with current connection - if (resourceManagerConnection != null - && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) - { - log.info("JobManager successfully registered at ResourceManager, leader id: {}.", - success.getResourceManagerLeaderId()); - slotPool.setResourceManager(success.getResourceManagerLeaderId(), - resourceManagerConnection.getTargetGateway()); - } - } - }); + validateRunsInMainThread(); + + // verify the response with current connection + if (resourceManagerConnection != null + && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) + { + log.info("JobManager successfully registered at ResourceManager, leader id: {}.", + success.getResourceManagerLeaderId()); + + slotPool.setResourceManager( + success.getResourceManagerLeaderId(), resourceManagerConnection.getTargetGateway()); + } } private void closeResourceManagerConnection() { + validateRunsInMainThread(); + if (resourceManagerConnection != null) { resourceManagerConnection.close(); resourceManagerConnection = null; @@ -903,32 +855,49 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { slotPool.disconnectResourceManager(); } + private void validateLeaderSessionId(UUID leaderSessionID) throws LeaderIdMismatchException { + if (this.leaderSessionID == null || !this.leaderSessionID.equals(leaderSessionID)) { + throw new LeaderIdMismatchException(this.leaderSessionID, leaderSessionID); + } + } + //---------------------------------------------------------------------------------------------- // Utility classes //---------------------------------------------------------------------------------------------- private class ResourceManagerLeaderListener implements LeaderRetrievalListener { + @Override public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { - notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID); + runAsync(new Runnable() { + @Override + public void run() { + notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID); + } + }); } @Override public void handleError(final Exception exception) { - handleFatalError(exception); + handleFatalError(new Exception("Fatal error in the ResourceManager leader service", exception)); } } + //---------------------------------------------------------------------------------------------- + private class ResourceManagerConnection extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess> { private final JobID jobID; + private final String jobManagerRpcAddress; + private final UUID jobManagerLeaderID; ResourceManagerConnection( final Logger log, final JobID jobID, + final String jobManagerRpcAddress, final UUID jobManagerLeaderID, final String resourceManagerAddress, final UUID resourceManagerLeaderID, @@ -936,6 +905,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { { super(log, resourceManagerAddress, resourceManagerLeaderID, executor); this.jobID = checkNotNull(jobID); + this.jobManagerRpcAddress = checkNotNull(jobManagerRpcAddress); this.jobManagerLeaderID = checkNotNull(jobManagerLeaderID); } @@ -946,18 +916,29 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { getTargetAddress(), getTargetLeaderId()) { @Override - protected Future<RegistrationResponse> invokeRegistration(ResourceManagerGateway gateway, UUID leaderId, - long timeoutMillis) throws Exception + protected Future<RegistrationResponse> invokeRegistration( + ResourceManagerGateway gateway, UUID leaderId, long timeoutMillis) throws Exception { Time timeout = Time.milliseconds(timeoutMillis); - return gateway.registerJobManager(leaderId, jobManagerLeaderID, getAddress(), jobID, timeout); + + return gateway.registerJobManager( + leaderId, + jobManagerLeaderID, + jobManagerRpcAddress, + jobID, + timeout); } }; } @Override protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) { - onResourceManagerRegistrationSuccess(success); + runAsync(new Runnable() { + @Override + public void run() { + onResourceManagerRegistrationSuccess(success); + } + }); } @Override @@ -965,4 +946,25 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { handleFatalError(failure); } } + + //---------------------------------------------------------------------------------------------- + + private class JobManagerJobStatusListener implements JobStatusListener { + + @Override + public void jobStatusChanges( + final JobID jobId, + final JobStatus newJobStatus, + final long timestamp, + final Throwable error) { + + // run in rpc thread to avoid concurrency + runAsync(new Runnable() { + @Override + public void run() { + jobStatusChanged(newJobStatus, timestamp, error); + } + }); + } + } }
