Repository: samza Updated Branches: refs/heads/master 92ae4c628 -> a1e03af0d
SAMZA-1250: JobRunner.kill doesn't terminate cleanly with YarnJob. 1. The ClientHelper now checks inactive application IDs so it can get status for terminated jobs in addition to running jobs 2. JobRunner.kill() waits for any finish, not just successful finish. 3. A killed job is now considered successful. Author: Jacob Maes <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #152 from jmakes/samza-1250 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a1e03af0 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a1e03af0 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a1e03af0 Branch: refs/heads/master Commit: a1e03af0da684184fbd95ec4278fc35fe4b4e28b Parents: 92ae4c6 Author: Jacob Maes <[email protected]> Authored: Mon May 1 13:44:54 2017 -0700 Committer: Jacob Maes <[email protected]> Committed: Mon May 1 13:44:54 2017 -0700 ---------------------------------------------------------------------- .../scala/org/apache/samza/job/JobRunner.scala | 2 +- .../apache/samza/job/yarn/ClientHelper.scala | 38 +++++++++++++------- .../org/apache/samza/job/yarn/YarnJob.scala | 17 +++++---- 3 files changed, 38 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a1e03af0/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index b2f5bd0..f34db99 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -137,7 +137,7 @@ class JobRunner(config: Config) extends Logging { info("waiting for job to terminate") // Wait until the job has terminated, then exit. - Option(job.waitForStatus(SuccessfulFinish, 5000)) match { + Option(job.waitForFinish(5000)) match { case Some(appStatus) => { if (SuccessfulFinish.equals(appStatus)) { info("job terminated successfully - " + appStatus) http://git-wip-us.apache.org/repos/asf/samza/blob/a1e03af0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala index dc1ead3..f4fc757 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala @@ -236,22 +236,31 @@ class ClientHelper(conf: Configuration) extends Logging { * @return the active application ids. */ def getActiveApplicationIds(appName: String): List[ApplicationId] = { - val getAppsRsp = yarnClient.getApplications + val applicationReports = yarnClient.getApplications - getAppsRsp + applicationReports .asScala - .filter(appRep => (( - Running.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get) - || New.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get) - ) - && appName.equals(appRep.getName))) - .map(appRep => appRep.getApplicationId) + .filter(applicationReport => isActiveApplication(applicationReport) + && appName.equals(applicationReport.getName)) + .map(applicationReport => applicationReport.getApplicationId) .toList } + def getPreviousApplicationIds(appName: String): List[ApplicationId] = { + val applicationReports = yarnClient.getApplications + + applicationReports + .asScala + .filter(applicationReport => (!(isActiveApplication(applicationReport)) + && appName.equals(applicationReport.getName))) + .map(applicationReport => applicationReport.getApplicationId) + .toList + } + def status(appId: ApplicationId): Option[ApplicationStatus] = { val statusResponse = yarnClient.getApplicationReport(appId) - convertState(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus) + info("Got state: %s, final status: %s".format(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus)) + toAppStatus(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus) } def kill(appId: ApplicationId) { @@ -271,15 +280,20 @@ class ClientHelper(conf: Configuration) extends Logging { status match { case Some(status) => getAppsRsp .asScala - .filter(appRep => status.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)) + .filter(appRep => status.equals(toAppStatus(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)) .toList case None => getAppsRsp.asScala.toList } } - private def convertState(state: YarnApplicationState, status: FinalApplicationStatus): Option[ApplicationStatus] = { + private def isActiveApplication(applicationReport: ApplicationReport): Boolean = { + (Running.equals(toAppStatus(applicationReport.getYarnApplicationState, applicationReport.getFinalApplicationStatus).get) + || New.equals(toAppStatus(applicationReport.getYarnApplicationState, applicationReport.getFinalApplicationStatus).get)) + } + + private def toAppStatus(state: YarnApplicationState, status: FinalApplicationStatus): Option[ApplicationStatus] = { (state, status) match { - case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) => Some(SuccessfulFinish) + case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) | (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) => Some(SuccessfulFinish) case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) | (YarnApplicationState.FINISHED, _) => Some(UnsuccessfulFinish) case (YarnApplicationState.NEW, _) | (YarnApplicationState.SUBMITTED, _) => Some(New) case _ => Some(Running) http://git-wip-us.apache.org/repos/asf/samza/blob/a1e03af0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala index 030f914..5230b0f 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala @@ -140,7 +140,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob { client.status(appId).getOrElse(null) case None => logger.info("Unable to report status because no applicationId could be found.") - null + ApplicationStatus.SuccessfulFinish } } @@ -171,12 +171,17 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob { logger.info("Fetching status from YARN for application name %s" format applicationName) val applicationIds = client.getActiveApplicationIds(applicationName) - applicationIds.foreach(applicationId => { - logger.info("Found applicationId %s for applicationName %s" format(applicationId, applicationName)) - }) + if (applicationIds.nonEmpty) { + // Only return latest one, because there should only be one. + logger.info("Matching active ids: " + applicationIds.sorted.reverse.toString()) + applicationIds.sorted.reverse.headOption + } else { + // Couldn't find an active applicationID. Use one the latest finished ID. + val pastApplicationIds = client.getPreviousApplicationIds(applicationName) + // Don't log because there could be many, many previous app IDs for an application. + pastApplicationIds.sorted.reverse.headOption // Get latest + } - // Only return one, because there should only be one. - applicationIds.headOption case None => None }
