This is an automated email from the ASF dual-hosted git repository.
ztang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new bb528e3 YARN-10463: For Federation, we should support
getApplicationAttemptReport. (#2563)
bb528e3 is described below
commit bb528e32399524edc567b1af91f8906218917e9a
Author: zhuqi <[email protected]>
AuthorDate: Mon Dec 21 10:04:16 2020 +0800
YARN-10463: For Federation, we should support getApplicationAttemptReport.
(#2563)
Qi Zhu via Zhankun Tang
---
.../hadoop/yarn/server/router/RouterMetrics.java | 37 +++++++-
.../clientrm/FederationClientInterceptor.java | 70 +++++++++++++-
.../yarn/server/router/TestRouterMetrics.java | 56 +++++++++++
.../clientrm/TestFederationClientInterceptor.java | 102 ++++++++++++++++++++-
4 files changed, 262 insertions(+), 3 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
index 884e06e..24fdbb90 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
@@ -51,6 +51,8 @@ public final class RouterMetrics {
private MutableGaugeInt numAppsFailedRetrieved;
@Metric("# of multiple applications reports failed to be retrieved")
private MutableGaugeInt numMultipleAppsFailedRetrieved;
+ @Metric("# of applicationAttempt reports failed to be retrieved")
+ private MutableGaugeInt numAppAttemptsFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@@ -64,6 +66,10 @@ public final class RouterMetrics {
@Metric("Total number of successful Retrieved multiple apps reports and "
+ "latency(ms)")
private MutableRate totalSucceededMultipleAppsRetrieved;
+ @Metric("Total number of successful Retrieved " +
+ "appAttempt reports and latency(ms)")
+ private MutableRate totalSucceededAppAttemptsRetrieved;
+
/**
* Provide quantile counters for all latencies.
@@ -73,6 +79,7 @@ public final class RouterMetrics {
private MutableQuantiles killApplicationLatency;
private MutableQuantiles getApplicationReportLatency;
private MutableQuantiles getApplicationsReportLatency;
+ private MutableQuantiles getApplicationAttemptReportLatency;
private static volatile RouterMetrics INSTANCE = null;
private static MetricsRegistry registry;
@@ -92,6 +99,10 @@ public final class RouterMetrics {
getApplicationsReportLatency =
registry.newQuantiles("getApplicationsReportLatency",
"latency of get applications report", "ops", "latency", 10);
+ getApplicationAttemptReportLatency =
+ registry.newQuantiles("getApplicationAttemptReportLatency",
+ "latency of get applicationattempt " +
+ "report", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@@ -134,6 +145,11 @@ public final class RouterMetrics {
}
@VisibleForTesting
+ public long getNumSucceededAppAttemptsRetrieved() {
+ return totalSucceededAppAttemptsRetrieved.lastStat().numSamples();
+ }
+
+ @VisibleForTesting
public long getNumSucceededMultipleAppsRetrieved() {
return totalSucceededMultipleAppsRetrieved.lastStat().numSamples();
}
@@ -154,6 +170,11 @@ public final class RouterMetrics {
}
@VisibleForTesting
+ public double getLatencySucceededGetAppAttemptReport() {
+ return totalSucceededAppAttemptsRetrieved.lastStat().mean();
+ }
+
+ @VisibleForTesting
public double getLatencySucceededGetAppReport() {
return totalSucceededAppsRetrieved.lastStat().mean();
}
@@ -184,6 +205,11 @@ public final class RouterMetrics {
}
@VisibleForTesting
+ public int getAppAttemptsFailedRetrieved() {
+ return numAppsFailedRetrieved.value();
+ }
+
+ @VisibleForTesting
public int getMultipleAppsFailedRetrieved() {
return numMultipleAppsFailedRetrieved.value();
}
@@ -213,6 +239,11 @@ public final class RouterMetrics {
getApplicationsReportLatency.add(duration);
}
+ public void succeededAppAttemptsRetrieved(long duration) {
+ totalSucceededAppAttemptsRetrieved.add(duration);
+ getApplicationAttemptReportLatency.add(duration);
+ }
+
public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
@@ -233,4 +264,8 @@ public final class RouterMetrics {
numMultipleAppsFailedRetrieved.incr();
}
-}
\ No newline at end of file
+ public void incrAppAttemptsFailedRetrieved() {
+ numAppAttemptsFailedRetrieved.incr();
+ }
+
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index a721fe0..7e8e7af 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -749,11 +749,79 @@ public class FederationClientInterceptor
throw new NotImplementedException("Code is not implemented");
}
+ /**
+ * The YARN Router will forward to the respective YARN RM in which the AM is
+ * running.
+ *
+ * Possible failure:
+ *
+ * Client: identical behavior as {@code ClientRMService}.
+ *
+ * Router: the Client will timeout and resubmit the request.
+ *
+ * ResourceManager: the Router will timeout and the call will fail.
+ *
+ * State Store: the Router will timeout and it will retry depending on the
+ * FederationFacade settings - if the failure happened before the select
+ * operation.
+ */
@Override
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
GetApplicationAttemptReportRequest request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ long startTime = clock.getTime();
+
+ if (request == null || request.getApplicationAttemptId() == null
+ || request.getApplicationAttemptId().getApplicationId() == null) {
+ routerMetrics.incrAppAttemptsFailedRetrieved();
+ RouterServerUtil.logAndThrowException(
+ "Missing getApplicationAttemptReport " +
+ "request or applicationId " +
+ "or applicationAttemptId information.",
+ null);
+ }
+
+ SubClusterId subClusterId = null;
+
+ try {
+ subClusterId = federationFacade
+ .getApplicationHomeSubCluster(
+ request.getApplicationAttemptId().getApplicationId());
+ } catch (YarnException e) {
+ routerMetrics.incrAppAttemptsFailedRetrieved();
+ RouterServerUtil
+ .logAndThrowException("ApplicationAttempt " +
+ request.getApplicationAttemptId() +
+ "belongs to Application " +
+ request.getApplicationAttemptId().getApplicationId() +
+ " does not exist in FederationStateStore", e);
+ }
+
+ ApplicationClientProtocol clientRMProxy =
+ getClientRMProxyForSubCluster(subClusterId);
+
+ GetApplicationAttemptReportResponse response = null;
+ try {
+ response = clientRMProxy.getApplicationAttemptReport(request);
+ } catch (Exception e) {
+ routerMetrics.incrAppAttemptsFailedRetrieved();
+ LOG.error("Unable to get the applicationAttempt report for "
+ + request.getApplicationAttemptId() + "to SubCluster "
+ + subClusterId.getId(), e);
+ throw e;
+ }
+
+ if (response == null) {
+ LOG.error("No response when attempting to retrieve the report of "
+ + "the applicationAttempt "
+ + request.getApplicationAttemptId() + " to SubCluster "
+ + subClusterId.getId());
+ }
+
+ long stopTime = clock.getTime();
+ routerMetrics.succeededAppAttemptsRetrieved(stopTime - startTime);
+ return response;
}
@Override
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
index 4c18ace..1456a42 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
@@ -47,11 +47,15 @@ public class TestRouterMetrics {
Assert.assertEquals(0, metrics.getNumSucceededAppsSubmitted());
Assert.assertEquals(0, metrics.getNumSucceededAppsKilled());
Assert.assertEquals(0, metrics.getNumSucceededAppsRetrieved());
+ Assert.assertEquals(0,
+ metrics.getNumSucceededAppAttemptsRetrieved());
Assert.assertEquals(0, metrics.getAppsFailedCreated());
Assert.assertEquals(0, metrics.getAppsFailedSubmitted());
Assert.assertEquals(0, metrics.getAppsFailedKilled());
Assert.assertEquals(0, metrics.getAppsFailedRetrieved());
+ Assert.assertEquals(0,
+ metrics.getAppAttemptsFailedRetrieved());
LOG.info("Test: aggregate metrics are updated correctly");
}
@@ -197,6 +201,46 @@ public class TestRouterMetrics {
}
/**
+ * This test validates the correctness of the metric:
+ * Retrieved AppAttempt Report
+ * successfully.
+ */
+ @Test
+ public void testSucceededAppAttemptReport() {
+
+ long totalGoodBefore = metrics.getNumSucceededAppAttemptsRetrieved();
+
+ goodSubCluster.getApplicationAttemptReport(100);
+
+ Assert.assertEquals(totalGoodBefore + 1,
+ metrics.getNumSucceededAppAttemptsRetrieved());
+ Assert.assertEquals(100,
+ metrics.getLatencySucceededGetAppAttemptReport(), 0);
+
+ goodSubCluster.getApplicationAttemptReport(200);
+
+ Assert.assertEquals(totalGoodBefore + 2,
+ metrics.getNumSucceededAppAttemptsRetrieved());
+ Assert.assertEquals(150,
+ metrics.getLatencySucceededGetAppAttemptReport(), 0);
+ }
+
+ /**
+ * This test validates the correctness of the metric:
+ * Failed to retrieve AppAttempt Report.
+ */
+ @Test
+ public void testAppAttemptReportFailed() {
+
+ long totalBadbefore = metrics.getAppAttemptsFailedRetrieved();
+
+ badSubCluster.getApplicationAttemptReport();
+
+ Assert.assertEquals(totalBadbefore + 1,
+ metrics.getAppAttemptsFailedRetrieved());
+ }
+
+ /**
* This test validates the correctness of the metric: Retrieved Multiple Apps
* successfully.
*/
@@ -257,6 +301,11 @@ public class TestRouterMetrics {
metrics.incrAppsFailedRetrieved();
}
+ public void getApplicationAttemptReport() {
+ LOG.info("Mocked: failed getApplicationAttemptReport call");
+ metrics.incrAppsFailedRetrieved();
+ }
+
public void getApplicationsReport() {
LOG.info("Mocked: failed getApplicationsReport call");
metrics.incrMultipleAppsFailedRetrieved();
@@ -289,6 +338,13 @@ public class TestRouterMetrics {
metrics.succeededAppsRetrieved(duration);
}
+ public void getApplicationAttemptReport(long duration) {
+ LOG.info("Mocked: successful " +
+ "getApplicationAttemptReport call with duration {}",
+ duration);
+ metrics.succeededAppAttemptsRetrieved(duration);
+ }
+
public void getApplicationsReport(long duration) {
LOG.info("Mocked: successful getApplicationsReport call with duration
{}",
duration);
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
index ee6e7b8..125dfcf 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
@@ -26,9 +26,12 @@ import java.util.List;
import java.util.Map;
+import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
@@ -38,6 +41,7 @@ import
org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -177,7 +181,7 @@ public class TestFederationClientInterceptor extends
BaseRouterClientRMTest {
ApplicationId appId) {
ContainerLaunchContext amContainerSpec =
mock(ContainerLaunchContext.class);
ApplicationSubmissionContext context = ApplicationSubmissionContext
- .newInstance(appId, MockApps.newAppName(), "q1",
+ .newInstance(appId, MockApps.newAppName(), "default",
Priority.newInstance(0), amContainerSpec, false, false, -1,
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
@@ -410,6 +414,102 @@ public class TestFederationClientInterceptor extends
BaseRouterClientRMTest {
}
}
+ /**
+ * This test validates the correctness of
+ * GetApplicationAttemptReport in case the
+ * application exists in the cluster.
+ */
+ @Test
+ public void testGetApplicationAttemptReport()
+ throws YarnException, IOException, InterruptedException {
+ LOG.info("Test FederationClientInterceptor: " +
+ "Get ApplicationAttempt Report");
+
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+
+ SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
+
+ // Submit the application we want the applicationAttempt report later
+ SubmitApplicationResponse response =
interceptor.submitApplication(request);
+
+ Assert.assertNotNull(response);
+ Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+ GetApplicationAttemptReportRequest requestGet =
+ GetApplicationAttemptReportRequest.newInstance(appAttemptId);
+
+ GetApplicationAttemptReportResponse responseGet =
+ interceptor.getApplicationAttemptReport(requestGet);
+
+ Assert.assertNotNull(responseGet);
+ }
+
+ /**
+ * This test validates the correctness of
+ * GetApplicationAttemptReport in case the
+ * application does not exist in StateStore.
+ */
+ @Test
+ public void testGetApplicationAttemptNotExists()
+ throws Exception {
+ LOG.info(
+ "Test ApplicationClientProtocol: " +
+ "Get ApplicationAttempt Report - Not Exists");
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationAttemptId appAttemptID =
+ ApplicationAttemptId.newInstance(appId, 1);
+ GetApplicationAttemptReportRequest requestGet =
+ GetApplicationAttemptReportRequest.newInstance(appAttemptID);
+
+ LambdaTestUtils.intercept(YarnException.class, "ApplicationAttempt " +
+ appAttemptID + "belongs to Application " +
+ appId + " does not exist in FederationStateStore",
+ () -> interceptor.getApplicationAttemptReport(requestGet));
+ }
+
+ /**
+ * This test validates
+ * the correctness of GetApplicationAttemptReport in case of
+ * empty request.
+ */
+ @Test
+ public void testGetApplicationAttemptEmptyRequest()
+ throws Exception {
+ LOG.info("Test FederationClientInterceptor: " +
+ "Get ApplicationAttempt Report - Empty");
+
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing getApplicationAttemptReport " +
+ "request or applicationId " +
+ "or applicationAttemptId information.",
+ () -> interceptor.getApplicationAttemptReport(null));
+
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing getApplicationAttemptReport " +
+ "request or applicationId " +
+ "or applicationAttemptId information.",
+ () -> interceptor
+ .getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest
+ .newInstance(null)));
+
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing getApplicationAttemptReport " +
+ "request or applicationId " +
+ "or applicationAttemptId information.",
+ () -> interceptor
+ .getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest.newInstance(
+ ApplicationAttemptId
+ .newInstance(null, 1)
+ )));
+ }
+
+
@Test
public void testGetClusterMetricsRequest() throws YarnException, IOException
{
LOG.info("Test FederationClientInterceptor : Get Cluster Metrics request");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]