Repository: incubator-samza Updated Branches: refs/heads/master 354bcdb77 -> 300ad6a8c
SAMZA-218: Show container start-time and up-time on YARN AM web Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/300ad6a8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/300ad6a8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/300ad6a8 Branch: refs/heads/master Commit: 300ad6a8cfdd22f5a75260afd3a0e6fa87797b61 Parents: 354bcdb Author: Zhijie Shen <[email protected]> Authored: Thu May 1 20:29:06 2014 -0700 Committer: Zhijie Shen <[email protected]> Committed: Thu May 1 20:29:06 2014 -0700 ---------------------------------------------------------------------- build.gradle | 1 + gradle/dependency-versions.gradle | 1 + .../resources/scalate/WEB-INF/views/index.scaml | 8 +++- .../samza/job/yarn/SamzaAppMasterState.scala | 4 +- .../job/yarn/SamzaAppMasterTaskManager.scala | 4 +- .../apache/samza/job/yarn/YarnContainer.scala | 47 ++++++++++++++++++++ .../webapp/ApplicationMasterRestServlet.scala | 8 ++-- 7 files changed, 63 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 055ef9b..72928b1 100644 --- a/build.gradle +++ b/build.gradle @@ -146,6 +146,7 @@ project(":samza-yarn_$scalaVersion") { exclude module: 'scala-compiler' exclude module: 'slf4j-api' } + compile "joda-time:joda-time:$jodaTimeVersion" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 4338b23..819a578 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -1,4 +1,5 @@ ext { + jodaTimeVersion = "2.2" joptSimpleVersion = "3.2" jacksonVersion = "1.8.5" junitVersion = "4.8.1" http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml index 6530bad..d17b9c4 100644 --- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml +++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml @@ -63,9 +63,13 @@ Task group #{taskId.toString} %ul %li - %a(target="_blank" href="http://#{container.getNodeHttpAddress}/node/containerlogs/#{container.getId.toString}/#{username}")= container.getId.toString + %a(target="_blank" href="http://#{container.nodeHttpAddress}/node/containerlogs/#{container.id.toString}/#{username}")= container.id.toString %li - %a(target="_blank" href="http://#{container.getNodeHttpAddress}")= container.getNodeHttpAddress + %a(target="_blank" href="http://#{container.nodeHttpAddress}")= container.nodeHttpAddress + %li + Start time: #{container.startTimeStr()} + %li + Up time: #{container.upTimeStr()} %tr %td Completed %td= state.completedTasks.toString http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala index fa1642b..01a2683 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala @@ -18,8 +18,6 @@ */ package org.apache.samza.job.yarn -import org.apache.hadoop.yarn.api.records.ContainerStatus -import org.apache.hadoop.yarn.api.records.Container import org.apache.samza.config.Config import grizzled.slf4j.Logging import org.apache.hadoop.yarn.api.records.FinalApplicationStatus @@ -41,7 +39,7 @@ class SamzaAppMasterState(val taskId: Int, val containerId: ContainerId, val nod var taskCount = 0 var unclaimedTasks = Set[Int]() var finishedTasks = Set[Int]() - var runningTasks = Map[Int, Container]() + var runningTasks = Map[Int, YarnContainer]() var taskPartitions = Map[Int, Set[Partition]]() var status = FinalApplicationStatus.UNDEFINED http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala index 58b2d30..eb1ff54 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala @@ -127,7 +127,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/%s 1>logs/%s 2>logs/%s" format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, command, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)) state.neededContainers -= 1 - state.runningTasks += taskId -> container + state.runningTasks += taskId -> new YarnContainer(container) state.unclaimedTasks -= taskId state.taskPartitions += taskId -> streamsAndPartitionsForTask.map(_.getPartition).toSet @@ -146,7 +146,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA override def onContainerCompleted(containerStatus: ContainerStatus) { val containerIdStr = ConverterUtils.toString(containerStatus.getContainerId) - val taskId = state.runningTasks.filter { case (_, container) => container.getId().equals(containerStatus.getContainerId()) }.keys.headOption + val taskId = state.runningTasks.filter { case (_, container) => container.id.equals(containerStatus.getContainerId()) }.keys.headOption taskId match { case Some(taskId) => { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnContainer.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnContainer.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnContainer.scala new file mode 100644 index 0000000..7ab866f --- /dev/null +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnContainer.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn + +import org.apache.hadoop.yarn.api.records.Container +import org.joda.time.Period +import org.joda.time.format.{ DateTimeFormatter, ISODateTimeFormat, ISOPeriodFormat, PeriodFormatter } + +object YarnContainerUtils { + val dateFormater = ISODateTimeFormat.dateTime + val periodFormater = ISOPeriodFormat.standard +} + +/** + * YARN container information plus start time and up time + */ +class YarnContainer(container: Container) { + val id = container.getId() + val nodeId = container.getNodeId(); + val nodeHttpAddress = container.getNodeHttpAddress(); + val resource = container.getResource(); + val priority = container.getPriority(); + val containerToken = container.getContainerToken(); + val startTime = System.currentTimeMillis() + def startTimeStr(dtFormatter: Option[DateTimeFormatter] = None) = + dtFormatter.getOrElse(YarnContainerUtils.dateFormater).print(startTime) + def upTime = System.currentTimeMillis() + def upTimeStr(periodFormatter: Option[PeriodFormatter] = None) = + periodFormatter.getOrElse(YarnContainerUtils.periodFormater).print(new Period(startTime, upTime)) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala index 8fce8a7..17a96f0 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala @@ -78,12 +78,14 @@ class ApplicationMasterRestServlet(config: Config, state: SamzaAppMasterState, r val containers = new HashMap[String, HashMap[String, Object]] state.runningTasks.values.foreach(c => { - val containerIdStr = c.getId.toString + val containerIdStr = c.id.toString val containerMap = new HashMap[String, Object] - val taskId = state.runningTasks.filter { case (_, container) => container.getId.toString.equals(containerIdStr) }.keys.head + val taskId = state.runningTasks.filter { case (_, container) => container.id.toString.equals(containerIdStr) }.keys.head var partitions = new java.util.ArrayList(state.taskPartitions.get(taskId).get) - containerMap.put("yarn-address", c.getNodeHttpAddress) + containerMap.put("yarn-address", c.nodeHttpAddress) + containerMap.put("start-time", c.startTime.toString) + containerMap.put("up-time", c.upTime.toString) containerMap.put("partitions", partitions) containerMap.put("task-id", taskId.toString) containers.put(containerIdStr, containerMap)
