Repository: flink Updated Branches: refs/heads/master 7167218fe -> 02459d244
Fix indentation for JobManager ScheduleOrUpdateConsumers and space for if-else. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02459d24 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02459d24 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02459d24 Branch: refs/heads/master Commit: 02459d2445d0e03f7c20b79dd1c4574d6d845f83 Parents: 7167218 Author: Henry Saputra <[email protected]> Authored: Wed Mar 18 18:07:44 2015 -0700 Committer: Henry Saputra <[email protected]> Committed: Wed Mar 18 18:07:44 2015 -0700 ---------------------------------------------------------------------- .../flink/runtime/jobmanager/JobManager.scala | 28 ++++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/02459d24/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index fe66b37..5fc9faa 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -123,7 +123,7 @@ class JobManager(val configuration: Configuration, archive ! PoisonPill profiler.foreach( ref => ref ! PoisonPill ) - for((e,_) <- currentJobs.values){ + for((e,_) <- currentJobs.values) { e.fail(new Exception("The JobManager is shutting down.")) } @@ -136,7 +136,7 @@ class JobManager(val configuration: Configuration, case e: IOException => log.error(e, "Could not properly shutdown the library cache manager.") } - if(log.isDebugEnabled) { + if (log.isDebugEnabled) { log.debug("Job manager {} is completely stopped.", self.path) } } @@ -151,7 +151,7 @@ class JobManager(val configuration: Configuration, case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) => val taskManager = sender - if(instanceManager.isRegistered(taskManager)) { + if (instanceManager.isRegistered(taskManager)) { val instanceID = instanceManager.getRegisteredInstance(taskManager).getId taskManager ! AlreadyRegistered(instanceID, libraryCacheManager.getBlobServerPort, profiler) } else { @@ -200,7 +200,7 @@ class JobManager(val configuration: Configuration, } case UpdateTaskExecutionState(taskExecutionState) => - if(taskExecutionState == null){ + if (taskExecutionState == null) { sender ! false } else { currentJobs.get(taskExecutionState.getJobID) match { @@ -224,16 +224,16 @@ class JobManager(val configuration: Configuration, case Some((executionGraph,_)) => val execution = executionGraph.getRegisteredExecutions.get(executionAttempt) - if(execution == null){ + if (execution == null) { log.error("Can not find Execution for attempt {}.", executionAttempt) null - }else{ + } else { val slot = execution.getAssignedResource val taskId = execution.getVertex.getParallelSubtaskIndex - val host = if(slot != null){ + val host = if (slot != null) { slot.getInstance().getInstanceConnectionInfo.getHostname - }else{ + } else { null } @@ -242,7 +242,7 @@ class JobManager(val configuration: Configuration, case splitAssigner: InputSplitAssigner => val nextInputSplit = splitAssigner.getNextInputSplit(host, taskId) - if(log.isDebugEnabled) { + if (log.isDebugEnabled) { log.debug("Send next input split {}.", nextInputSplit) } @@ -278,9 +278,9 @@ class JobManager(val configuration: Configuration, case Some((executionGraph, jobInfo)) => executionGraph.getJobName log.info("Status of job {} ({}) changed to {} {}.", jobID, executionGraph.getJobName, newJobStatus, - if(error == null) "" else error.getMessage) + if (error == null) "" else error.getMessage) - if(newJobStatus.isTerminalState) { + if (newJobStatus.isTerminalState) { jobInfo.end = timeStamp // is the client waiting for the job result? @@ -321,7 +321,7 @@ class JobManager(val configuration: Configuration, case None => } -case ScheduleOrUpdateConsumers(jobId, partitionId) => + case ScheduleOrUpdateConsumers(jobId, partitionId) => currentJobs.get(jobId) match { case Some((executionGraph, _)) => sender ! Acknowledge @@ -406,7 +406,7 @@ case ScheduleOrUpdateConsumers(jobId, partitionId) => taskManager forward SendStackTrace case Terminated(taskManager) => - if(instanceManager.isRegistered(taskManager)) { + if (instanceManager.isRegistered(taskManager)) { log.info("Task manager {} terminated.", taskManager.path) instanceManager.unregisterTaskManager(taskManager) @@ -419,7 +419,7 @@ case ScheduleOrUpdateConsumers(jobId, partitionId) => case Disconnect(msg) => val taskManager = sender - if(instanceManager.isRegistered(taskManager)){ + if (instanceManager.isRegistered(taskManager)) { log.info("Task manager {} wants to disconnect, because {}.", taskManager.path, msg) instanceManager.unregisterTaskManager(taskManager)
