Repository: samza Updated Branches: refs/heads/master 5f81b8d13 -> 85184d05b
SAMZA-1470: Wrong job status returned by YarnRestJobStatusProvider wh⦠â¦en there are multiple app Author: Jacob Maes <[email protected]> Reviewers: Jagadish <[email protected]> Closes #339 from jmakes/samza-1470-4 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/85184d05 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/85184d05 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/85184d05 Branch: refs/heads/master Commit: 85184d05b99a698c08b120914060a3240ea03d2b Parents: 5f81b8d Author: Jacob Maes <[email protected]> Authored: Wed Oct 25 15:18:11 2017 -0700 Committer: Jacob Maes <[email protected]> Committed: Wed Oct 25 15:18:11 2017 -0700 ---------------------------------------------------------------------- .../rest/model/yarn/YarnApplicationInfo.java | 12 ++- .../proxy/job/YarnRestJobStatusProvider.java | 43 ++++++----- .../job/TestYarnRestJobStatusProvider.java | 77 ++++++++++++++++++++ 3 files changed, 107 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/85184d05/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java b/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java index 1c7f757..8d55c89 100644 --- a/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java +++ b/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java @@ -18,6 +18,7 @@ */ package org.apache.samza.rest.model.yarn; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.samza.rest.proxy.job.JobInstance; @@ -40,14 +41,11 @@ public class YarnApplicationInfo { } /** - * Returns a Map with all the apps and their names as the key. + * + * @return the full list of Yarn applications. There will likely be more than one per job-instance. */ - public Map<String, YarnApplication> getApplications() { - Map<String, YarnApplication> applications = new HashMap<>(); - for (YarnApplication app: this.apps) { - applications.put(app.getName(), app); - } - return applications; + public List<YarnApplication> getYarnApplications() { + return Collections.unmodifiableList(apps); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/85184d05/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java index 63a1ae4..a28e4b2 100644 --- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java @@ -29,9 +29,10 @@ import org.apache.commons.httpclient.HttpStatus; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.samza.SamzaException; -import org.apache.samza.rest.model.yarn.YarnApplicationInfo; import org.apache.samza.rest.model.Job; import org.apache.samza.rest.model.JobStatus; +import org.apache.samza.rest.model.yarn.YarnApplicationInfo; +import org.apache.samza.rest.model.yarn.YarnApplicationInfo.YarnApplication; import org.apache.samza.rest.resources.JobsResourceConfig; import org.apache.samza.rest.resources.YarnJobResourceConfig; import org.codehaus.jackson.map.DeserializationConfig; @@ -39,7 +40,6 @@ import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * An implementation of the {@link JobStatusProvider} that retrieves * the job status from the YARN REST api. @@ -52,12 +52,12 @@ public class YarnRestJobStatusProvider implements JobStatusProvider { private final String apiEndpoint; private final HttpClient httpClient; - public YarnRestJobStatusProvider(JobsResourceConfig config) { + YarnRestJobStatusProvider(JobsResourceConfig config) { YarnJobResourceConfig yarnConfig = new YarnJobResourceConfig(config); + this.httpClient = new HttpClient(); + this.apiEndpoint = String.format("http://%s/ws/v1/cluster/apps", yarnConfig.getYarnResourceManagerEndpoint()); OBJECT_MAPPER.configure(DeserializationConfig.Feature.UNWRAP_ROOT_VALUE, true); - this.apiEndpoint = String.format("http://%s/ws/v1/cluster/apps", - yarnConfig.getYarnResourceManagerEndpoint()); } @Override @@ -66,21 +66,28 @@ public class YarnRestJobStatusProvider implements JobStatusProvider { if (jobs == null || jobs.isEmpty()) { return; } + + // We will identify the YARN application states by their qualified names, so build a map + // to translate back from that name to the JobInfo we wish to populate. + final Map<String, Job> qualifiedJobToInfo = new HashMap<>(); + for(Job job : jobs) { + qualifiedJobToInfo.put(YarnApplicationInfo.getQualifiedJobName(new JobInstance(job.getJobName(), job.getJobId())), job); + } + try { byte[] response = httpGet(apiEndpoint); YarnApplicationInfo yarnApplicationInfo = OBJECT_MAPPER.readValue(response, YarnApplicationInfo.class); - Map<String, YarnApplicationInfo.YarnApplication> yarnApplications = yarnApplicationInfo.getApplications(); - for (Job job: jobs) { - String qualifiedJobName = YarnApplicationInfo.getQualifiedJobName(new JobInstance(job.getJobName(), job.getJobId())); - YarnApplicationInfo.YarnApplication yarnApp = yarnApplications.get(qualifiedJobName); - if (yarnApp == null) { - job.setStatusDetail(JobStatus.UNKNOWN.toString()); - job.setStatus(JobStatus.UNKNOWN); - continue; - } - JobStatus samzaStatus = yarnStateToSamzaStatus(YarnApplicationState.valueOf(yarnApp.getState().toUpperCase())); - if (job.getStatusDetail() == null || samzaStatus != JobStatus.STOPPED) { - job.setStatusDetail(yarnApp.getState()); + + // There can be multiple Yarn apps for each qualified job name, so we iterate the former and match with latter. + for (YarnApplication app: yarnApplicationInfo.getYarnApplications()) { + Job job = qualifiedJobToInfo.get(app.getName()); + JobStatus samzaStatus = yarnStateToSamzaStatus(YarnApplicationState.valueOf(app.getState().toUpperCase())); + + // If job is null, it wasn't requested. The default statusDetail is null so always update in that case. + // Only update the job status if the current status is not STOPPED because there could be many + // application attempts for the job, and we're interested in the RUNNING one if it exists. + if (job != null && (job.getStatusDetail() == null || samzaStatus != JobStatus.STOPPED)) { + job.setStatusDetail(app.getState()); job.setStatus(samzaStatus); } } @@ -126,7 +133,7 @@ public class YarnRestJobStatusProvider implements JobStatusProvider { * @return the response * @throws IOException if there are problems with the http get request. */ - private byte[] httpGet(String requestUrl) + byte[] httpGet(String requestUrl) throws IOException { GetMethod getMethod = new GetMethod(requestUrl); try { http://git-wip-us.apache.org/repos/asf/samza/blob/85184d05/samza-rest/src/test/java/org/apache/samza/rest/proxy/job/TestYarnRestJobStatusProvider.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/rest/proxy/job/TestYarnRestJobStatusProvider.java b/samza-rest/src/test/java/org/apache/samza/rest/proxy/job/TestYarnRestJobStatusProvider.java new file mode 100644 index 0000000..0875a6f --- /dev/null +++ b/samza-rest/src/test/java/org/apache/samza/rest/proxy/job/TestYarnRestJobStatusProvider.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.proxy.job; + +import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import junit.framework.TestCase; +import org.apache.samza.config.MapConfig; +import org.apache.samza.rest.model.Job; +import org.apache.samza.rest.model.JobStatus; +import org.apache.samza.rest.resources.JobsResourceConfig; +import org.junit.Test; + +import static org.mockito.Mockito.*; + + +public class TestYarnRestJobStatusProvider extends TestCase { + private YarnRestJobStatusProvider provider; + + private static final String APPS_RESPONSE = + "{\"apps\":{\"app\":[{\"id\":\"application_1502919535296_0161\",\"name\":\"job1_1\",\"state\":\"KILLED\",\"finalStatus\":\"KILLED\",\"applicationType\":\"Samza\"}," + + "{\"id\":\"application_1502919535296_0163\",\"name\":\"job1_1\",\"state\":\"RUNNING\",\"finalStatus\":\"UNDEFINED\",\"applicationType\":\"Samza\"}," + + "{\"id\":\"application_1502919535296_0162\",\"name\":\"job1_1\",\"state\":\"KILLED\",\"finalStatus\":\"KILLED\",\"applicationType\":\"Samza\"}," + + "{\"id\":\"application_1502919535296_0165\",\"name\":\"job2_1\",\"state\":\"KILLED\",\"finalStatus\":\"KILLED\",\"applicationType\":\"Samza\"}," + + "{\"id\":\"application_1502919535296_0164\",\"name\":\"job3_1\",\"state\":\"RUNNING\",\"finalStatus\":\"UNDEFINED\",\"applicationType\":\"Samza\"}]}}"; + + @Override + protected void setUp() throws Exception { + super.setUp(); + + provider = spy(new YarnRestJobStatusProvider(new JobsResourceConfig(new MapConfig()))); + } + + @Test + public void testGetJobStatuses() throws IOException, InterruptedException { + doReturn(APPS_RESPONSE.getBytes()).when(provider).httpGet(anyString()); + + List<Job> jobs = Lists.newArrayList( + new Job("job1", "1"), // Job with multiple applications, 1 RUNNING + new Job("job2", "1"), // Job with 1 KILLED application + new Job("job3", "1"), // Job with 1 RUNNING application + new Job("job4", "1")); // Job not found in YARN + provider.getJobStatuses(jobs); + + Collections.sort(jobs, (o1, o2) -> o1.getJobName().compareTo(o2.getJobName())); + + assertEquals(4, jobs.size()); + verifyJobStatus(jobs.get(0), "job1", JobStatus.STARTED, "RUNNING"); + verifyJobStatus(jobs.get(1), "job2", JobStatus.STOPPED, "KILLED"); + verifyJobStatus(jobs.get(2), "job3", JobStatus.STARTED, "RUNNING"); + verifyJobStatus(jobs.get(3), "job4", JobStatus.UNKNOWN, null); + } + + private void verifyJobStatus(Job job, String jobName, JobStatus samzaStatus, String yarnStatus) { + assertEquals(jobName, job.getJobName()); + assertEquals(samzaStatus, job.getStatus()); + assertEquals(yarnStatus, job.getStatusDetail()); + } +}
