Repository: samza Updated Branches: refs/heads/master 171793b69 -> 7a2b4650c
SAMZA-1508: JobRunner should not return success until the job is healthy Author: Jacob Maes <jma...@apache.org> Author: Jacob Maes <jm...@linkedin.com> Reviewers: Prateek Maheshwari <pmahe...@linkedin.com> Closes #367 from jmakes/samza-1508 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7a2b4650 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7a2b4650 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7a2b4650 Branch: refs/heads/master Commit: 7a2b4650cd7c87f0475bc06d306cd4ef834377b0 Parents: 171793b Author: Jacob Maes <jma...@apache.org> Authored: Fri May 18 14:23:07 2018 -0700 Committer: Jacob Maes <--global> Committed: Fri May 18 14:23:07 2018 -0700 ---------------------------------------------------------------------- build.gradle | 1 + .../scala/org/apache/samza/job/JobRunner.scala | 38 +-- .../webapp/ApplicationMasterRestClient.java | 111 +++++++ .../apache/samza/job/yarn/ClientHelper.scala | 54 ++- .../org/apache/samza/job/yarn/YarnJob.scala | 10 +- .../webapp/ApplicationMasterRestServlet.scala | 76 +++-- .../webapp/TestApplicationMasterRestClient.java | 330 +++++++++++++++++++ .../samza/job/yarn/TestClientHelper.scala | 36 +- 8 files changed, 583 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/7a2b4650/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 2f27a03..0b4dae5 100644 --- a/build.gradle +++ b/build.gradle @@ -448,6 +448,7 @@ project(":samza-yarn_$scalaVersion") { compile "org.scala-lang:scala-compiler:$scalaLibVersion" compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion" compile "commons-httpclient:commons-httpclient:$commonsHttpClientVersion" + compile "org.apache.httpcomponents:httpclient:$httpClientVersion" compile("org.apache.hadoop:hadoop-yarn-api:$yarnVersion") { exclude module: 'slf4j-log4j12' } http://git-wip-us.apache.org/repos/asf/samza/blob/7a2b4650/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index 917c018..c6e14f2 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -20,6 +20,8 @@ package org.apache.samza.job +import java.util.concurrent.TimeUnit + import org.apache.samza.SamzaException import org.apache.samza.config.Config import org.apache.samza.config.JobConfig.Config2Job @@ -117,23 +119,11 @@ class JobRunner(config: Config) extends Logging { coordinatorSystemProducer.stop() // Create the actual job, and submit it. - val job = jobFactory.getJob(config).submit - - info("waiting for job to start") - - // Wait until the job has started, then exit. - Option(job.waitForStatus(Running, 500)) match { - case Some(appStatus) => { - if (Running.equals(appStatus)) { - info("job started successfully - " + appStatus) - } else { - warn("unable to start job successfully. job has status %s" format (appStatus)) - } - } - case _ => warn("unable to start job successfully.") - } + val job = jobFactory.getJob(config) + + job.submit() - info("exiting") + info("Job submitted. Check status to determine when it is running.") job } @@ -143,21 +133,7 @@ class JobRunner(config: Config) extends Logging { // Create the actual job, and kill it. val job = jobFactory.getJob(config).kill() - info("waiting for job to terminate") - - // Wait until the job has terminated, then exit. - Option(job.waitForFinish(5000)) match { - case Some(appStatus) => { - if (SuccessfulFinish.equals(appStatus)) { - info("job terminated successfully - " + appStatus) - } else { - warn("unable to terminate job successfully. job has status %s" format (appStatus)) - } - } - case _ => warn("unable to terminate job successfully.") - } - - info("exiting") + info("Kill command executed. Check status to determine when it is terminated.") } def status(): ApplicationStatus = { http://git-wip-us.apache.org/repos/asf/samza/blob/7a2b4650/samza-yarn/src/main/java/org/apache/samza/webapp/ApplicationMasterRestClient.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/webapp/ApplicationMasterRestClient.java b/samza-yarn/src/main/java/org/apache/samza/webapp/ApplicationMasterRestClient.java new file mode 100644 index 0000000..eed16db --- /dev/null +++ b/samza-yarn/src/main/java/org/apache/samza/webapp/ApplicationMasterRestClient.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.webapp; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import org.apache.http.HttpHost; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; + + +/** + * Client for the {@link ApplicationMasterRestServlet}. + */ +public class ApplicationMasterRestClient implements Closeable { + private final CloseableHttpClient httpClient; + private final HttpHost appMasterHost; + private final ObjectMapper jsonMapper = SamzaObjectMapper.getObjectMapper(); + + public ApplicationMasterRestClient(CloseableHttpClient client, String amHostName, int amRpcPort) { + httpClient = client; + appMasterHost = new HttpHost(amHostName, amRpcPort); + } + + /** + * @return the metrics as a map of groupName to metricName to metricValue. + * @throws IOException if there was an error fetching the metrics from the servlet. + */ + public Map<String, Map<String, Object>> getMetrics() throws IOException { + String jsonString = getEntityAsJson("/metrics", "metrics"); + return jsonMapper.readValue(jsonString, new TypeReference<Map<String, Map<String, Object>>>() {}); + } + + /** + * @return the task context as a map of key to value + * @throws IOException if there was an error fetching the task context from the servlet. + */ + public Map<String, Object> getTaskContext() throws IOException { + String jsonString = getEntityAsJson("/task-context", "task context"); + return jsonMapper.readValue(jsonString, new TypeReference<Map<String, Object>>() {}); + } + + /** + * @return the AM state as a map of key to value + * @throws IOException if there was an error fetching the AM state from the servlet. + */ + public Map<String, Object> getAmState() throws IOException { + String jsonString = getEntityAsJson("/am", "AM state"); + return jsonMapper.readValue(jsonString, new TypeReference<Map<String, Object>>() {}); + } + + /** + * @return the config as a map of key to value + * @throws IOException if there was an error fetching the config from the servlet. + */ + public Map<String, Object> getConfig() throws IOException { + String jsonString = getEntityAsJson("/config", "config"); + return jsonMapper.readValue(jsonString, new TypeReference<Map<String, Object>>() {}); + } + + @Override + public void close() throws IOException { + httpClient.close(); + } + + private String getEntityAsJson(String path, String entityName) throws IOException { + HttpGet getRequest = new HttpGet(path); + HttpResponse httpResponse = httpClient.execute(appMasterHost, getRequest); + + StatusLine status = httpResponse.getStatusLine(); + if (status.getStatusCode() != HttpStatus.SC_OK) { + throw new SamzaException(String.format( + "Error retrieving %s from host %s. Response: %s", + entityName, + appMasterHost.toURI(), + status.getReasonPhrase())); + } + + return EntityUtils.toString(httpResponse.getEntity()); + } + + @Override + public String toString() { + return "AppMasterClient for uri: " + appMasterHost.toURI().toString(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7a2b4650/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala index e6c0896..d193ddb 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala @@ -19,7 +19,6 @@ package org.apache.samza.job.yarn - import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.permission.FsPermission import org.apache.samza.config.{Config, JobConfig, YarnConfig} @@ -57,6 +56,9 @@ import org.apache.samza.util.Logging import java.io.IOException import java.nio.ByteBuffer +import org.apache.http.impl.client.HttpClientBuilder +import org.apache.samza.webapp.ApplicationMasterRestClient + object ClientHelper { val applicationType = "Samza" @@ -81,6 +83,13 @@ class ClientHelper(conf: Configuration) extends Logging { yarnClient } + private[yarn] def createAmClient(applicationReport: ApplicationReport) = { + val amHostName = applicationReport.getHost + val rpcPort = applicationReport.getRpcPort + + new ApplicationMasterRestClient(HttpClientBuilder.create.build, amHostName, rpcPort) + } + var jobContext: JobContext = null /** @@ -242,8 +251,8 @@ class ClientHelper(conf: Configuration) extends Logging { applicationReports .asScala - .filter(applicationReport => isActiveApplication(applicationReport) - && appName.equals(applicationReport.getName)) + .filter(applicationReport => appName.equals(applicationReport.getName) + && isActiveApplication(applicationReport)) .map(applicationReport => applicationReport.getApplicationId) .toList } @@ -253,8 +262,8 @@ class ClientHelper(conf: Configuration) extends Logging { applicationReports .asScala - .filter(applicationReport => (!(isActiveApplication(applicationReport)) - && appName.equals(applicationReport.getName))) + .filter(applicationReport => appName.equals(applicationReport.getName) + && (!isActiveApplication(applicationReport))) .map(applicationReport => applicationReport.getApplicationId) .toList } @@ -305,8 +314,39 @@ class ClientHelper(conf: Configuration) extends Logging { } else { Some(ApplicationStatus.unsuccessfulFinish(new SamzaException(diagnostics))) } - case (YarnApplicationState.NEW, _) | (YarnApplicationState.SUBMITTED, _) => Some(New) - case _ => Some(Running) + case (YarnApplicationState.RUNNING, _) => + if (allContainersRunning(applicationReport)) { + Some(Running) + } else { + Some(New) + } + case _ => + Some(New) + } + } + + def allContainersRunning(applicationReport: ApplicationReport): Boolean = { + val amClient: ApplicationMasterRestClient = createAmClient(applicationReport) + + debug("Created client: " + amClient.toString) + + try { + val metrics = amClient.getMetrics + debug("Got metrics: " + metrics.toString) + + val neededContainers = Integer.parseInt( + metrics.get(classOf[SamzaAppMasterMetrics].getCanonicalName) + .get("needed-containers") + .toString) + + info("Needed containers: " + neededContainers) + if (neededContainers == 0) { + true + } else { + false + } + } finally { + amClient.close() } } http://git-wip-us.apache.org/repos/asf/samza/blob/7a2b4650/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala index 3e9d0fe..d335448 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala @@ -21,9 +21,10 @@ package org.apache.samza.job.yarn import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.records.ApplicationId +import org.apache.samza.SamzaException import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.{Config, JobConfig, ShellCommandConfig, YarnConfig} -import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish, UnsuccessfulFinish} +import org.apache.samza.job.ApplicationStatus.{SuccessfulFinish, UnsuccessfulFinish} import org.apache.samza.job.{ApplicationStatus, StreamJob} import org.apache.samza.serializers.model.SamzaObjectMapper import org.apache.samza.util.{CoordinatorStreamUtil, Util} @@ -115,7 +116,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob { Thread.sleep(1000) } - Running + getStatus } def waitForStatus(status: ApplicationStatus, timeoutMs: Long): ApplicationStatus = { @@ -130,14 +131,15 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob { Thread.sleep(1000) } - Running + getStatus } def getStatus: ApplicationStatus = { getAppId match { case Some(appId) => logger.info("Getting status for applicationId %s" format appId) - client.status(appId).getOrElse(null) + client.status(appId).getOrElse( + throw new SamzaException("No status was determined for applicationId %s" format appId)) case None => logger.info("Unable to report status because no applicationId could be found.") ApplicationStatus.SuccessfulFinish http://git-wip-us.apache.org/repos/asf/samza/blob/7a2b4650/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala index 122a1df..5c10987 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala @@ -19,44 +19,41 @@ package org.apache.samza.webapp +import java.{lang, util} + import org.apache.samza.clustermanager.SamzaApplicationState import org.scalatra._ import scalate.ScalateSupport import org.apache.samza.config.Config -import org.apache.samza.job.yarn.{YarnAppState, ClientHelper} +import org.apache.samza.job.yarn.{ClientHelper, YarnAppState} import org.apache.samza.metrics._ + import scala.collection.JavaConverters._ import org.apache.hadoop.yarn.conf.YarnConfiguration import java.util.HashMap -import org.apache.samza.serializers.model.SamzaObjectMapper -class ApplicationMasterRestServlet(samzaConfig: Config, samzaAppState: SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry) extends ScalatraServlet with ScalateSupport { - val yarnConfig = new YarnConfiguration - val client = new ClientHelper(yarnConfig) - val jsonMapper = SamzaObjectMapper.getObjectMapper - - before() { - contentType = "application/json" - } +import org.apache.samza.serializers.model.SamzaObjectMapper +import org.codehaus.jackson.map.ObjectMapper - get("/metrics") { - val metricMap = new HashMap[String, java.util.Map[String, Object]] +object ApplicationMasterRestServlet { + def getMetrics(jsonMapper: ObjectMapper, metricsRegistry: ReadableMetricsRegistry) = { + val metricMap = new HashMap[String, util.Map[String, Object]] // build metric map - registry.getGroups.asScala.foreach(group => { + metricsRegistry.getGroups.asScala.foreach(group => { val groupMap = new HashMap[String, Object] - registry.getGroup(group).asScala.foreach { + metricsRegistry.getGroup(group).asScala.foreach { case (name, metric) => metric.visit(new MetricsVisitor() { def counter(counter: Counter) = - groupMap.put(counter.getName, counter.getCount: java.lang.Long) + groupMap.put(counter.getName, counter.getCount: lang.Long) def gauge[T](gauge: Gauge[T]) = - groupMap.put(gauge.getName, gauge.getValue.asInstanceOf[java.lang.Object]) + groupMap.put(gauge.getName, gauge.getValue.asInstanceOf[Object]) def timer(timer: Timer) = - groupMap.put(timer.getName, timer.getSnapshot().getAverage: java.lang.Double) + groupMap.put(timer.getName, timer.getSnapshot().getAverage: lang.Double) }) } @@ -66,18 +63,18 @@ class ApplicationMasterRestServlet(samzaConfig: Config, samzaAppState: SamzaAppl jsonMapper.writeValueAsString(metricMap) } - get("/task-context") { + def getTaskContext(jsonMapper: ObjectMapper, state: YarnAppState) = { // sick of fighting with scala.. just using java map for now val contextMap = new HashMap[String, Object] - contextMap.put("task-id", state.taskId: java.lang.Integer) + contextMap.put("task-id", state.taskId: Integer) contextMap.put("name", state.amContainerId.toString) jsonMapper.writeValueAsString(contextMap) } - get("/am") { - val containers = new HashMap[String, HashMap[String, Object]] + def getAmState(jsonMapper: ObjectMapper, samzaAppState: SamzaApplicationState, state: YarnAppState) = { + val containers = new HashMap[String, util.HashMap[String, Object]] state.runningYarnContainers.asScala.foreach { case (containerId, container) => @@ -101,7 +98,42 @@ class ApplicationMasterRestServlet(samzaConfig: Config, samzaAppState: SamzaAppl jsonMapper.writeValueAsString(new HashMap[String, Object](status.asJava)) } - get("/config") { + def getConfig(jsonMapper: ObjectMapper, samzaConfig: Config) = { jsonMapper.writeValueAsString(new HashMap[String, Object](samzaConfig.sanitize)) } } + +/** + * Defines the Scalatra routes for the servlet. + */ +class ApplicationMasterRestServlet(samzaConfig: Config, samzaAppState: SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry) extends ScalatraServlet with ScalateSupport { + val yarnConfig = new YarnConfiguration + val client = new ClientHelper(yarnConfig) + val jsonMapper = SamzaObjectMapper.getObjectMapper + + before() { + contentType = "application/json" + } + + get("/metrics") { + ApplicationMasterRestServlet.getMetrics(jsonMapper, registry) + } + + + + get("/task-context") { + ApplicationMasterRestServlet.getTaskContext(jsonMapper, state) + } + + + + get("/am") { + ApplicationMasterRestServlet.getAmState(jsonMapper, samzaAppState, state) + } + + + + get("/config") { + ApplicationMasterRestServlet.getConfig(jsonMapper, samzaConfig) + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7a2b4650/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java new file mode 100644 index 0000000..dbe534f --- /dev/null +++ b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.webapp; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.io.StringReader; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.commons.io.input.ReaderInputStream; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.HttpStatus; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.clustermanager.SamzaApplicationState; +import org.apache.samza.clustermanager.SamzaResource; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.container.grouper.task.GroupByContainerCount; +import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.TaskModel; +import org.apache.samza.job.yarn.SamzaAppMasterMetrics; +import org.apache.samza.job.yarn.YarnAppState; +import org.apache.samza.job.yarn.YarnContainer; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + + +public class TestApplicationMasterRestClient { + private static final String AM_HOST_NAME = "dummyHost"; + private static final int AM_RPC_PORT = 1337; + private static final int AM_HTTP_PORT = 7001; + private static final String YARN_CONTAINER_ID_1 = "container_e38_1510966221296_0007_01_000001"; + private static final String YARN_CONTAINER_ID_2 = "container_e38_1510966221296_0007_01_000002"; + private static final String YARN_CONTAINER_ID_3 = "container_e38_1510966221296_0007_01_000003"; + private static final String APP_ATTEMPT_ID = "appattempt_1510966221296_0007_000001"; + + private final ObjectMapper jsonMapper = SamzaObjectMapper.getObjectMapper(); + + private CloseableHttpClient mockClient; + + @Before + public void setup() { + mockClient = mock(CloseableHttpClient.class); + } + + @Rule + public ExpectedException expectedException = ExpectedException.none(); // Enables us to verify the exception message + + @Test + public void testGetMetricsSuccess() throws IOException { + SamzaApplicationState samzaAppState = createSamzaApplicationState(); + + MetricsRegistryMap registry = new MetricsRegistryMap(); + assignMetricValues(samzaAppState, registry); + + String response = ApplicationMasterRestServlet.getMetrics(jsonMapper, registry); + setupMockClientResponse(HttpStatus.SC_OK, "Success", response); + + ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); + Map<String, Map<String, Object>> metricsResult = client.getMetrics(); + + String group = SamzaAppMasterMetrics.class.getCanonicalName(); + assertEquals(1, metricsResult.size()); + assertTrue(metricsResult.containsKey(group)); + + Map<String, Object> amMetricsGroup = metricsResult.get(group); + assertEquals(7, amMetricsGroup.size()); + assertEquals(samzaAppState.runningContainers.size(), amMetricsGroup.get("running-containers")); + assertEquals(samzaAppState.neededContainers.get(), amMetricsGroup.get("needed-containers")); + assertEquals(samzaAppState.completedContainers.get(), amMetricsGroup.get("completed-containers")); + assertEquals(samzaAppState.failedContainers.get(), amMetricsGroup.get("failed-containers")); + assertEquals(samzaAppState.releasedContainers.get(), amMetricsGroup.get("released-containers")); + assertEquals(samzaAppState.containerCount.get(), amMetricsGroup.get("container-count")); + assertEquals(samzaAppState.jobHealthy.get() ? 1 : 0, amMetricsGroup.get("job-healthy")); + } + + @Test + public void testGetMetricsError() throws IOException { + setupErrorTest("metrics"); + + ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); + client.getMetrics(); + } + + @Test + public void testGetTaskContextSuccess() throws IOException { + ContainerId containerId = ConverterUtils.toContainerId(YARN_CONTAINER_ID_1); + YarnAppState yarnAppState = createYarnAppState(containerId); + + String response = ApplicationMasterRestServlet.getTaskContext(jsonMapper, yarnAppState); + setupMockClientResponse(HttpStatus.SC_OK, "Success", response); + + ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); + Map<String, Object> taskContextResult = client.getTaskContext(); + + assertEquals(2, taskContextResult.size()); + assertEquals(2, taskContextResult.get("task-id")); + assertEquals(containerId.toString(), taskContextResult.get("name")); + } + + @Test + public void testTaskContextError() throws IOException { + setupErrorTest("task context"); + + ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); + client.getTaskContext(); + } + + @Test + public void testGetAmStateSuccess() throws IOException { + SamzaApplicationState samzaAppState = createSamzaApplicationState(); + + ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId(APP_ATTEMPT_ID); + ContainerId containerId = ConverterUtils.toContainerId(YARN_CONTAINER_ID_1); + YarnAppState yarnAppState = createYarnAppState(containerId); + + String response = ApplicationMasterRestServlet.getAmState(jsonMapper, samzaAppState, yarnAppState); + setupMockClientResponse(HttpStatus.SC_OK, "Success", response); + + ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); + Map<String, Object> amStateResult = client.getAmState(); + + assertEquals(4, amStateResult.size()); + assertEquals(String.format("%s:%s", yarnAppState.nodeHost, yarnAppState.rpcUrl.getPort()), amStateResult.get("host")); + assertEquals(containerId.toString(), amStateResult.get("container-id")); + // Can only validate the keys because up-time changes everytime it's requested + assertEquals(buildExpectedContainerResponse(yarnAppState.runningYarnContainers, samzaAppState).keySet(), + ((Map<String, Object>) amStateResult.get("containers")).keySet()); + assertEquals(attemptId.toString(), amStateResult.get("app-attempt-id")); + } + + @Test + public void testGetAmStateError() throws IOException { + setupErrorTest("AM state"); + + ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); + client.getAmState(); + } + + @Test + public void testGetConfigSuccess() throws IOException { + SamzaApplicationState samzaAppState = createSamzaApplicationState(); + + Map<String, String> configMap = ImmutableMap.of("key1", "value1", "key2", "value2"); + Config config = new MapConfig(configMap); + + String response = ApplicationMasterRestServlet.getConfig(jsonMapper, config); + setupMockClientResponse(HttpStatus.SC_OK, "Success", response); + + ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); + Map<String, Object> configResult = client.getConfig(); + + assertEquals(configMap, configResult); + } + + @Test + public void testGetConfigError() throws IOException { + setupErrorTest("config"); + + ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); + client.getConfig(); + } + + @Test + public void testCloseMethodClosesHttpClient() throws IOException { + ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); + client.close(); + + verify(mockClient).close(); + } + + private void setupMockClientResponse(int statusCode, String statusReason, String responseBody) throws IOException { + StatusLine statusLine = mock(StatusLine.class); + when(statusLine.getStatusCode()).thenReturn(statusCode); + when(statusLine.getReasonPhrase()).thenReturn(statusReason); + + HttpEntity entity = mock(HttpEntity.class); + when(entity.getContent()).thenReturn(new ReaderInputStream(new StringReader(responseBody))); + + CloseableHttpResponse response = mock(CloseableHttpResponse.class); + when(response.getStatusLine()).thenReturn(statusLine); + when(response.getEntity()).thenReturn(entity); + + when(mockClient.execute(any(HttpHost.class), any(HttpGet.class))).thenReturn(response); + } + + private SamzaApplicationState createSamzaApplicationState() { + HashMap<String, ContainerModel> containers = generateContainers(); + + JobModel mockJobModel = mock(JobModel.class); + when(mockJobModel.getContainers()).thenReturn(containers); + JobModelManager mockJobModelManager = mock(JobModelManager.class); + when(mockJobModelManager.jobModel()).thenReturn(mockJobModel); + + SamzaApplicationState samzaApplicationState = new SamzaApplicationState(mockJobModelManager); + + samzaApplicationState.runningContainers.put(YARN_CONTAINER_ID_3, + new SamzaResource(1, 2, "dummyNodeHost1", "dummyResourceId1")); + samzaApplicationState.runningContainers.put(YARN_CONTAINER_ID_2, + new SamzaResource(2, 4, "dummyNodeHost2", "dummyResourceId2")); + return samzaApplicationState; + } + + private YarnAppState createYarnAppState(ContainerId containerId) throws MalformedURLException { + YarnAppState yarnAppState = new YarnAppState(2, containerId, AM_HOST_NAME, AM_RPC_PORT, AM_HTTP_PORT); + yarnAppState.rpcUrl = new URL(new HttpHost(AM_HOST_NAME, AM_RPC_PORT).toURI()); + yarnAppState.runningYarnContainers.put("0", new YarnContainer(Container.newInstance( + ConverterUtils.toContainerId(YARN_CONTAINER_ID_2), + ConverterUtils.toNodeIdWithDefaultPort("dummyNodeHost1"), + "dummyNodeHttpHost1", + null, + null, + null + ))); + yarnAppState.runningYarnContainers.put("1", new YarnContainer(Container.newInstance( + ConverterUtils.toContainerId(YARN_CONTAINER_ID_3), + ConverterUtils.toNodeIdWithDefaultPort("dummyNodeHost2"), + "dummyNodeHttpHost2", + null, + null, + null + ))); + return yarnAppState; + } + + private HashMap<String, ContainerModel> generateContainers() { + Set<TaskModel> taskModels = ImmutableSet.of( + new TaskModel(new TaskName("task1"), + ImmutableSet.of(new SystemStreamPartition(new SystemStream("system1", "stream1"), new Partition(0))), + new Partition(0)), + new TaskModel(new TaskName("task2"), + ImmutableSet.of(new SystemStreamPartition(new SystemStream("system1", "stream1"), new Partition(1))), + new Partition(1))); + GroupByContainerCount grouper = new GroupByContainerCount(2); + Set<ContainerModel> containerModels = grouper.group(taskModels); + HashMap<String, ContainerModel> containers = new HashMap<>(); + for (ContainerModel containerModel : containerModels) { + containers.put(containerModel.getProcessorId(), containerModel); + } + return containers; + } + + private Map<String, Map<String, Object>> buildExpectedContainerResponse(Map<String, YarnContainer> runningYarnContainers, + SamzaApplicationState samzaAppState) throws IOException { + Map<String, Map<String, Object>> containers = new HashMap<>(); + + runningYarnContainers.forEach((containerId, container) -> { + String yarnContainerId = container.id().toString(); + Map<String, Object> containerMap = new HashMap(); + Map<TaskName, TaskModel> taskModels = samzaAppState.jobModelManager.jobModel().getContainers().get(containerId).getTasks(); + containerMap.put("yarn-address", container.nodeHttpAddress()); + containerMap.put("start-time", String.valueOf(container.startTime())); + containerMap.put("up-time", String.valueOf(container.upTime())); + containerMap.put("task-models", taskModels); + containerMap.put("container-id", containerId); + containers.put(yarnContainerId, containerMap); + }); + + return jsonMapper.readValue(jsonMapper.writeValueAsString(containers), new TypeReference<Map<String, Map<String, Object>>>() {}); + } + + private void assignMetricValues(SamzaApplicationState samzaAppState, MetricsRegistryMap registry) { + SamzaAppMasterMetrics metrics = new SamzaAppMasterMetrics(new MapConfig(), samzaAppState, registry); + metrics.start(); + samzaAppState.runningContainers.put("dummyContainer", + new SamzaResource(1, 2, AM_HOST_NAME, "dummyResourceId")); // 1 container + samzaAppState.neededContainers.set(2); + samzaAppState.completedContainers.set(3); + samzaAppState.failedContainers.set(4); + samzaAppState.releasedContainers.set(5); + samzaAppState.containerCount.set(6); + samzaAppState.jobHealthy.set(true); + } + + private void setupErrorTest(String entityToFetch) throws IOException { + String statusReason = "Dummy status reason"; + expectedException.expect(SamzaException.class); + expectedException.expectMessage(String.format( + "Error retrieving %s from host %s. Response: %s", + entityToFetch, + new HttpHost(AM_HOST_NAME, AM_RPC_PORT).toURI(), + statusReason)); + + setupMockClientResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR, statusReason, ""); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7a2b4650/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala index ee947ae..6df6589 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala @@ -20,19 +20,16 @@ package org.apache.samza.job.yarn import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.permission.FsPermission -import org.apache.hadoop.fs.{FileStatus, Path, FileSystem} -import org.apache.hadoop.yarn.api.records.ApplicationId -import org.apache.hadoop.yarn.api.records.ApplicationReport -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus -import org.apache.hadoop.yarn.api.records.YarnApplicationState +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport, FinalApplicationStatus, YarnApplicationState} import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.samza.SamzaException -import org.apache.samza.config.{MapConfig, JobConfig, YarnConfig} +import org.apache.samza.config.{JobConfig, MapConfig, YarnConfig} import org.apache.samza.job.ApplicationStatus -import org.junit.Assert.assertEquals -import org.junit.Assert.assertNotNull -import org.mockito.Mockito._ +import org.apache.samza.webapp.ApplicationMasterRestClient +import org.junit.Assert.{assertEquals, assertNotNull} import org.mockito.Matchers.any +import org.mockito.Mockito._ import org.scalatest.FunSuite import org.scalatest.mockito.MockitoSugar @@ -40,11 +37,15 @@ import org.scalatest.mockito.MockitoSugar class TestClientHelper extends FunSuite { import MockitoSugar._ val hadoopConfig = mock[Configuration] + val mockAmClient = mock[ApplicationMasterRestClient] val clientHelper = new ClientHelper(hadoopConfig) { override def createYarnClient() = { mock[YarnClient] } + override def createAmClient(applicationReport: ApplicationReport) = { + mockAmClient + } } test("test validateJobConfig") { @@ -120,5 +121,22 @@ class TestClientHelper extends FunSuite { when(appReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED) appStatus = clientHelper.toAppStatus(appReport).get assertEquals(appStatus, ApplicationStatus.SuccessfulFinish) + + val appMasterMetrics = new java.util.HashMap[String, Object]() + val metrics = new java.util.HashMap[String, java.util.Map[String, Object]]() + metrics.put(classOf[SamzaAppMasterMetrics].getCanonicalName(), appMasterMetrics) + appMasterMetrics.put("needed-containers", "1") + when(mockAmClient.getMetrics).thenReturn(metrics) + + when(appReport.getYarnApplicationState).thenReturn(YarnApplicationState.RUNNING) + appStatus = clientHelper.toAppStatus(appReport).get + assertEquals(appStatus, ApplicationStatus.New) // Should not be RUNNING if there are still needed containers + + appMasterMetrics.put("needed-containers", "0") + when(mockAmClient.getMetrics).thenReturn(metrics) + + when(appReport.getYarnApplicationState).thenReturn(YarnApplicationState.RUNNING) + appStatus = clientHelper.toAppStatus(appReport).get + assertEquals(appStatus, ApplicationStatus.Running) // Should not be RUNNING if there are still needed containers } }