Repository: samza Updated Branches: refs/heads/master 6726e1d10 -> 46c25cf90
SAMZA-1246: ApplicatonRunner.stats() should include exception in case of failure Current when ApplicationRunner.stats() only returns the enum representing the status. It also need to include the exception if the status is failure. Author: Xinyu Liu <[email protected]> Reviewers: Jake Maes <[email protected]> Closes #154 from xinyuiscool/SAMZA-1246 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/46c25cf9 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/46c25cf9 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/46c25cf9 Branch: refs/heads/master Commit: 46c25cf901647ff0b1fce46ce4545034a42a27c9 Parents: 6726e1d Author: Xinyu Liu <[email protected]> Authored: Wed May 3 12:17:22 2017 -0700 Committer: Xinyu Liu <[email protected]> Committed: Wed May 3 12:17:22 2017 -0700 ---------------------------------------------------------------------- checkstyle/checkstyle-suppressions.xml | 3 + .../org/apache/samza/job/ApplicationStatus.java | 59 ++++++++++++++++++-- .../apache/samza/runtime/ApplicationRunner.java | 2 +- .../samza/runtime/LocalApplicationRunner.java | 3 +- .../samza/runtime/RemoteApplicationRunner.java | 32 +++++++---- .../org/apache/samza/job/local/ProcessJob.scala | 2 +- .../apache/samza/job/yarn/ClientHelper.scala | 22 ++++++-- .../samza/job/yarn/TestClientHelper.scala | 33 +++++++++++ 8 files changed, 130 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/46c25cf9/checkstyle/checkstyle-suppressions.xml ---------------------------------------------------------------------- diff --git a/checkstyle/checkstyle-suppressions.xml b/checkstyle/checkstyle-suppressions.xml index 428ac93..a88b341 100644 --- a/checkstyle/checkstyle-suppressions.xml +++ b/checkstyle/checkstyle-suppressions.xml @@ -27,5 +27,8 @@ files="TestZkProcessorLatch.java" lines="91-275"/> --> + <suppress checks="ConstantName" + files="ApplicationStatus.java" + lines="26-29"/> </suppressions> http://git-wip-us.apache.org/repos/asf/samza/blob/46c25cf9/samza-api/src/main/java/org/apache/samza/job/ApplicationStatus.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/job/ApplicationStatus.java b/samza-api/src/main/java/org/apache/samza/job/ApplicationStatus.java index c41430a..baf095b 100644 --- a/samza-api/src/main/java/org/apache/samza/job/ApplicationStatus.java +++ b/samza-api/src/main/java/org/apache/samza/job/ApplicationStatus.java @@ -22,16 +22,63 @@ package org.apache.samza.job; /** * Status of a {@link org.apache.samza.job.StreamJob} during and after its run. */ -public enum ApplicationStatus { - Running("Running"), SuccessfulFinish("SuccessfulFinish"), UnsuccessfulFinish("UnsuccessfulFinish"), New("New"); +public class ApplicationStatus { + public static final ApplicationStatus New = new ApplicationStatus(StatusCode.New, null); + public static final ApplicationStatus Running = new ApplicationStatus(StatusCode.Running, null); + public static final ApplicationStatus SuccessfulFinish = new ApplicationStatus(StatusCode.SuccessfulFinish, null); + public static final ApplicationStatus UnsuccessfulFinish = new ApplicationStatus(StatusCode.UnsuccessfulFinish, null); - private final String str; + public enum StatusCode { + New, + Running, + SuccessfulFinish, + UnsuccessfulFinish + } + + private final StatusCode statusCode; + private final Throwable throwable; + + private ApplicationStatus(StatusCode code, Throwable t) { + this.statusCode = code; + this.throwable = t; + } - private ApplicationStatus(String str) { - this.str = str; + public StatusCode getStatusCode() { + return statusCode; } + public Throwable getThrowable() { + return throwable; + } + + @Override public String toString() { - return str; + return statusCode.name(); + } + + + public static ApplicationStatus unsuccessfulFinish(Throwable t) { + return new ApplicationStatus(StatusCode.UnsuccessfulFinish, t); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (obj.getClass() != getClass()) { + return false; + } + + ApplicationStatus rhs = (ApplicationStatus) obj; + return statusCode.equals(rhs.statusCode); + } + + @Override + public int hashCode() { + return statusCode.hashCode(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/46c25cf9/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java index b761d86..8fb991a 100644 --- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java +++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java @@ -96,7 +96,7 @@ public abstract class ApplicationRunner { /** * Get the collective status of the Samza jobs represented by {@link StreamApplication}. - * Returns {@link ApplicationStatus#Running} if any of the jobs are running. + * Returns {@link ApplicationRunner} running if all jobs are running. * * @param streamApp the user-defined {@link StreamApplication} object * @return the status of the application http://git-wip-us.apache.org/repos/asf/samza/blob/46c25cf9/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index bff0f1c..3efbdc1 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -133,7 +133,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { awaitComplete(); } catch (Throwable t) { - appStatus = ApplicationStatus.UnsuccessfulFinish; + appStatus = ApplicationStatus.unsuccessfulFinish(t); throw new SamzaException("Failed to run application", t); } finally { if (coordination != null) { @@ -247,7 +247,6 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { latch.await(); if (throwable.get() != null) { - appStatus = ApplicationStatus.UnsuccessfulFinish; throw throwable.get(); } else { appStatus = ApplicationStatus.SuccessfulFinish; http://git-wip-us.apache.org/repos/asf/samza/blob/46c25cf9/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java index d5f6e21..5daecda 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java @@ -84,8 +84,9 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { @Override public ApplicationStatus status(StreamApplication app) { try { - boolean finished = false; - boolean unsuccessfulFinish = false; + boolean hasNewJobs = false; + boolean hasRunningJobs = false; + ApplicationStatus unsuccessfulFinishStatus = null; ExecutionPlan plan = getExecutionPlan(app); for (JobConfig jobConfig : plan.getJobConfigs()) { @@ -93,25 +94,36 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { ApplicationStatus status = runner.status(); log.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()}); - switch (status) { + switch (status.getStatusCode()) { + case New: + hasNewJobs = true; + break; case Running: - return ApplicationStatus.Running; + hasRunningJobs = true; + break; case UnsuccessfulFinish: - unsuccessfulFinish = true; + unsuccessfulFinishStatus = status; + break; case SuccessfulFinish: - finished = true; break; default: // Do nothing } } - if (unsuccessfulFinish) { - return ApplicationStatus.UnsuccessfulFinish; - } else if (finished) { + if (hasNewJobs) { + // There are jobs not started, report as New + return ApplicationStatus.New; + } else if (hasRunningJobs) { + // All jobs are started, some are running + return ApplicationStatus.Running; + } else if (unsuccessfulFinishStatus != null) { + // All jobs are finished, some are not successful + return unsuccessfulFinishStatus; + } else { + // All jobs are finished successfully return ApplicationStatus.SuccessfulFinish; } - return ApplicationStatus.New; } catch (Throwable t) { throw new SamzaException("Failed to get status for application", t); } http://git-wip-us.apache.org/repos/asf/samza/blob/46c25cf9/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala index f068773..bc2d74b 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala @@ -70,7 +70,7 @@ class ProcessJob(commandBuilder: CommandBuilder, jobCoordinator: JobModelManager def kill: StreamJob = { process.destroyForcibly - jobStatus = Some(UnsuccessfulFinish); + jobStatus = Some(UnsuccessfulFinish) ProcessJob.this } http://git-wip-us.apache.org/repos/asf/samza/blob/46c25cf9/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala index f4fc757..e6c0896 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala @@ -19,6 +19,8 @@ package org.apache.samza.job.yarn + +import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.permission.FsPermission import org.apache.samza.config.{Config, JobConfig, YarnConfig} import org.apache.samza.coordinator.stream.CoordinatorStreamWriter @@ -260,7 +262,7 @@ class ClientHelper(conf: Configuration) extends Logging { def status(appId: ApplicationId): Option[ApplicationStatus] = { val statusResponse = yarnClient.getApplicationReport(appId) info("Got state: %s, final status: %s".format(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus)) - toAppStatus(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus) + toAppStatus(statusResponse) } def kill(appId: ApplicationId) { @@ -280,21 +282,29 @@ class ClientHelper(conf: Configuration) extends Logging { status match { case Some(status) => getAppsRsp .asScala - .filter(appRep => status.equals(toAppStatus(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)) + .filter(appRep => status.equals(toAppStatus(appRep).get)) .toList case None => getAppsRsp.asScala.toList } } private def isActiveApplication(applicationReport: ApplicationReport): Boolean = { - (Running.equals(toAppStatus(applicationReport.getYarnApplicationState, applicationReport.getFinalApplicationStatus).get) - || New.equals(toAppStatus(applicationReport.getYarnApplicationState, applicationReport.getFinalApplicationStatus).get)) + (Running.equals(toAppStatus(applicationReport).get) + || New.equals(toAppStatus(applicationReport).get)) } - private def toAppStatus(state: YarnApplicationState, status: FinalApplicationStatus): Option[ApplicationStatus] = { + def toAppStatus(applicationReport: ApplicationReport): Option[ApplicationStatus] = { + val state = applicationReport.getYarnApplicationState + val status = applicationReport.getFinalApplicationStatus (state, status) match { case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) | (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) => Some(SuccessfulFinish) - case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) | (YarnApplicationState.FINISHED, _) => Some(UnsuccessfulFinish) + case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) | (YarnApplicationState.FINISHED, _) => + val diagnostics = applicationReport.getDiagnostics + if (StringUtils.isEmpty(diagnostics)) { + Some(UnsuccessfulFinish) + } else { + Some(ApplicationStatus.unsuccessfulFinish(new SamzaException(diagnostics))) + } case (YarnApplicationState.NEW, _) | (YarnApplicationState.SUBMITTED, _) => Some(New) case _ => Some(Running) } http://git-wip-us.apache.org/repos/asf/samza/blob/46c25cf9/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala index ad8337b..ee947ae 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala @@ -22,9 +22,15 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.fs.{FileStatus, Path, FileSystem} import org.apache.hadoop.yarn.api.records.ApplicationId +import org.apache.hadoop.yarn.api.records.ApplicationReport +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus +import org.apache.hadoop.yarn.api.records.YarnApplicationState import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.samza.SamzaException import org.apache.samza.config.{MapConfig, JobConfig, YarnConfig} +import org.apache.samza.job.ApplicationStatus +import org.junit.Assert.assertEquals +import org.junit.Assert.assertNotNull import org.mockito.Mockito._ import org.mockito.Matchers.any import org.scalatest.FunSuite @@ -88,4 +94,31 @@ class TestClientHelper extends FunSuite { assert(ret.size == 1) assert(ret.contains("some.keytab")) } + + test("test toAppStatus") { + val appReport = mock[ApplicationReport] + when(appReport.getYarnApplicationState).thenReturn(YarnApplicationState.FAILED) + when(appReport.getDiagnostics).thenReturn("some yarn diagnostics") + + var appStatus = clientHelper.toAppStatus(appReport).get + assertEquals(appStatus, ApplicationStatus.UnsuccessfulFinish) + assertNotNull(appStatus.getThrowable) + + when(appReport.getYarnApplicationState).thenReturn(YarnApplicationState.NEW) + appStatus = clientHelper.toAppStatus(appReport).get + assertEquals(appStatus, ApplicationStatus.New) + + when(appReport.getYarnApplicationState).thenReturn(YarnApplicationState.FINISHED) + when(appReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.FAILED) + appStatus = clientHelper.toAppStatus(appReport).get + assertEquals(appStatus, ApplicationStatus.UnsuccessfulFinish) + + when(appReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.KILLED) + appStatus = clientHelper.toAppStatus(appReport).get + assertEquals(appStatus, ApplicationStatus.UnsuccessfulFinish) + + when(appReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED) + appStatus = clientHelper.toAppStatus(appReport).get + assertEquals(appStatus, ApplicationStatus.SuccessfulFinish) + } }
