This is an automated email from the ASF dual-hosted git repository.

ztang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bb528e3  YARN-10463: For Federation, we should support 
getApplicationAttemptReport. (#2563)
bb528e3 is described below

commit bb528e32399524edc567b1af91f8906218917e9a
Author: zhuqi <[email protected]>
AuthorDate: Mon Dec 21 10:04:16 2020 +0800

    YARN-10463: For Federation, we should support getApplicationAttemptReport. 
(#2563)
    
    Qi Zhu via Zhankun Tang
---
 .../hadoop/yarn/server/router/RouterMetrics.java   |  37 +++++++-
 .../clientrm/FederationClientInterceptor.java      |  70 +++++++++++++-
 .../yarn/server/router/TestRouterMetrics.java      |  56 +++++++++++
 .../clientrm/TestFederationClientInterceptor.java  | 102 ++++++++++++++++++++-
 4 files changed, 262 insertions(+), 3 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
index 884e06e..24fdbb90 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
@@ -51,6 +51,8 @@ public final class RouterMetrics {
   private MutableGaugeInt numAppsFailedRetrieved;
   @Metric("# of multiple applications reports failed to be retrieved")
   private MutableGaugeInt numMultipleAppsFailedRetrieved;
+  @Metric("# of applicationAttempt reports failed to be retrieved")
+  private MutableGaugeInt numAppAttemptsFailedRetrieved;
 
   // Aggregate metrics are shared, and don't have to be looked up per call
   @Metric("Total number of successful Submitted apps and latency(ms)")
@@ -64,6 +66,10 @@ public final class RouterMetrics {
   @Metric("Total number of successful Retrieved multiple apps reports and "
       + "latency(ms)")
   private MutableRate totalSucceededMultipleAppsRetrieved;
+  @Metric("Total number of successful Retrieved " +
+          "appAttempt reports and latency(ms)")
+  private MutableRate totalSucceededAppAttemptsRetrieved;
+
 
   /**
    * Provide quantile counters for all latencies.
@@ -73,6 +79,7 @@ public final class RouterMetrics {
   private MutableQuantiles killApplicationLatency;
   private MutableQuantiles getApplicationReportLatency;
   private MutableQuantiles getApplicationsReportLatency;
+  private MutableQuantiles getApplicationAttemptReportLatency;
 
   private static volatile RouterMetrics INSTANCE = null;
   private static MetricsRegistry registry;
@@ -92,6 +99,10 @@ public final class RouterMetrics {
     getApplicationsReportLatency =
         registry.newQuantiles("getApplicationsReportLatency",
             "latency of get applications report", "ops", "latency", 10);
+    getApplicationAttemptReportLatency =
+        registry.newQuantiles("getApplicationAttemptReportLatency",
+                    "latency of get applicationattempt " +
+                            "report", "ops", "latency", 10);
   }
 
   public static RouterMetrics getMetrics() {
@@ -134,6 +145,11 @@ public final class RouterMetrics {
   }
 
   @VisibleForTesting
+  public long getNumSucceededAppAttemptsRetrieved() {
+    return totalSucceededAppAttemptsRetrieved.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
   public long getNumSucceededMultipleAppsRetrieved() {
     return totalSucceededMultipleAppsRetrieved.lastStat().numSamples();
   }
@@ -154,6 +170,11 @@ public final class RouterMetrics {
   }
 
   @VisibleForTesting
+  public double getLatencySucceededGetAppAttemptReport() {
+    return totalSucceededAppAttemptsRetrieved.lastStat().mean();
+  }
+
+  @VisibleForTesting
   public double getLatencySucceededGetAppReport() {
     return totalSucceededAppsRetrieved.lastStat().mean();
   }
@@ -184,6 +205,11 @@ public final class RouterMetrics {
   }
 
   @VisibleForTesting
+  public int getAppAttemptsFailedRetrieved() {
+    return numAppsFailedRetrieved.value();
+  }
+
+  @VisibleForTesting
   public int getMultipleAppsFailedRetrieved() {
     return numMultipleAppsFailedRetrieved.value();
   }
@@ -213,6 +239,11 @@ public final class RouterMetrics {
     getApplicationsReportLatency.add(duration);
   }
 
+  public void succeededAppAttemptsRetrieved(long duration) {
+    totalSucceededAppAttemptsRetrieved.add(duration);
+    getApplicationAttemptReportLatency.add(duration);
+  }
+
   public void incrAppsFailedCreated() {
     numAppsFailedCreated.incr();
   }
@@ -233,4 +264,8 @@ public final class RouterMetrics {
     numMultipleAppsFailedRetrieved.incr();
   }
 
-}
\ No newline at end of file
+  public void incrAppAttemptsFailedRetrieved() {
+    numAppAttemptsFailedRetrieved.incr();
+  }
+
+}
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index a721fe0..7e8e7af 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -749,11 +749,79 @@ public class FederationClientInterceptor
     throw new NotImplementedException("Code is not implemented");
   }
 
+  /**
+   * The YARN Router will forward to the respective YARN RM in which the AM is
+   * running.
+   *
+   * Possible failure:
+   *
+   * Client: identical behavior as {@code ClientRMService}.
+   *
+   * Router: the Client will timeout and resubmit the request.
+   *
+   * ResourceManager: the Router will timeout and the call will fail.
+   *
+   * State Store: the Router will timeout and it will retry depending on the
+   * FederationFacade settings - if the failure happened before the select
+   * operation.
+   */
   @Override
   public GetApplicationAttemptReportResponse getApplicationAttemptReport(
       GetApplicationAttemptReportRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long startTime = clock.getTime();
+
+    if (request == null || request.getApplicationAttemptId() == null
+            || request.getApplicationAttemptId().getApplicationId() == null) {
+      routerMetrics.incrAppAttemptsFailedRetrieved();
+      RouterServerUtil.logAndThrowException(
+              "Missing getApplicationAttemptReport " +
+                      "request or applicationId " +
+                      "or applicationAttemptId information.",
+              null);
+    }
+
+    SubClusterId subClusterId = null;
+
+    try {
+      subClusterId = federationFacade
+              .getApplicationHomeSubCluster(
+                      request.getApplicationAttemptId().getApplicationId());
+    } catch (YarnException e) {
+      routerMetrics.incrAppAttemptsFailedRetrieved();
+      RouterServerUtil
+              .logAndThrowException("ApplicationAttempt " +
+                      request.getApplicationAttemptId() +
+                      "belongs to Application " +
+                      request.getApplicationAttemptId().getApplicationId() +
+                      " does not exist in FederationStateStore", e);
+    }
+
+    ApplicationClientProtocol clientRMProxy =
+            getClientRMProxyForSubCluster(subClusterId);
+
+    GetApplicationAttemptReportResponse response = null;
+    try {
+      response = clientRMProxy.getApplicationAttemptReport(request);
+    } catch (Exception e) {
+      routerMetrics.incrAppAttemptsFailedRetrieved();
+      LOG.error("Unable to get the applicationAttempt report for "
+              + request.getApplicationAttemptId() + "to SubCluster "
+              + subClusterId.getId(), e);
+      throw e;
+    }
+
+    if (response == null) {
+      LOG.error("No response when attempting to retrieve the report of "
+              + "the applicationAttempt "
+              + request.getApplicationAttemptId() + " to SubCluster "
+              + subClusterId.getId());
+    }
+
+    long stopTime = clock.getTime();
+    routerMetrics.succeededAppAttemptsRetrieved(stopTime - startTime);
+    return response;
   }
 
   @Override
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
index 4c18ace..1456a42 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
@@ -47,11 +47,15 @@ public class TestRouterMetrics {
     Assert.assertEquals(0, metrics.getNumSucceededAppsSubmitted());
     Assert.assertEquals(0, metrics.getNumSucceededAppsKilled());
     Assert.assertEquals(0, metrics.getNumSucceededAppsRetrieved());
+    Assert.assertEquals(0,
+        metrics.getNumSucceededAppAttemptsRetrieved());
 
     Assert.assertEquals(0, metrics.getAppsFailedCreated());
     Assert.assertEquals(0, metrics.getAppsFailedSubmitted());
     Assert.assertEquals(0, metrics.getAppsFailedKilled());
     Assert.assertEquals(0, metrics.getAppsFailedRetrieved());
+    Assert.assertEquals(0,
+        metrics.getAppAttemptsFailedRetrieved());
 
     LOG.info("Test: aggregate metrics are updated correctly");
   }
@@ -197,6 +201,46 @@ public class TestRouterMetrics {
   }
 
   /**
+   * This test validates the correctness of the metric:
+   * Retrieved AppAttempt Report
+   * successfully.
+   */
+  @Test
+  public void testSucceededAppAttemptReport() {
+
+    long totalGoodBefore = metrics.getNumSucceededAppAttemptsRetrieved();
+
+    goodSubCluster.getApplicationAttemptReport(100);
+
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededAppAttemptsRetrieved());
+    Assert.assertEquals(100,
+        metrics.getLatencySucceededGetAppAttemptReport(), 0);
+
+    goodSubCluster.getApplicationAttemptReport(200);
+
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededAppAttemptsRetrieved());
+    Assert.assertEquals(150,
+        metrics.getLatencySucceededGetAppAttemptReport(), 0);
+  }
+
+  /**
+   * This test validates the correctness of the metric:
+   * Failed to retrieve AppAttempt Report.
+   */
+  @Test
+  public void testAppAttemptReportFailed() {
+
+    long totalBadbefore = metrics.getAppAttemptsFailedRetrieved();
+
+    badSubCluster.getApplicationAttemptReport();
+
+    Assert.assertEquals(totalBadbefore + 1,
+        metrics.getAppAttemptsFailedRetrieved());
+  }
+
+  /**
    * This test validates the correctness of the metric: Retrieved Multiple Apps
    * successfully.
    */
@@ -257,6 +301,11 @@ public class TestRouterMetrics {
       metrics.incrAppsFailedRetrieved();
     }
 
+    public void getApplicationAttemptReport() {
+      LOG.info("Mocked: failed getApplicationAttemptReport call");
+      metrics.incrAppsFailedRetrieved();
+    }
+
     public void getApplicationsReport() {
       LOG.info("Mocked: failed getApplicationsReport call");
       metrics.incrMultipleAppsFailedRetrieved();
@@ -289,6 +338,13 @@ public class TestRouterMetrics {
       metrics.succeededAppsRetrieved(duration);
     }
 
+    public void getApplicationAttemptReport(long duration) {
+      LOG.info("Mocked: successful " +
+              "getApplicationAttemptReport call with duration {}",
+          duration);
+      metrics.succeededAppAttemptsRetrieved(duration);
+    }
+
     public void getApplicationsReport(long duration) {
       LOG.info("Mocked: successful getApplicationsReport call with duration 
{}",
           duration);
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
index ee6e7b8..125dfcf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
@@ -26,9 +26,12 @@ import java.util.List;
 
 import java.util.Map;
 
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
@@ -38,6 +41,7 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -177,7 +181,7 @@ public class TestFederationClientInterceptor extends 
BaseRouterClientRMTest {
       ApplicationId appId) {
     ContainerLaunchContext amContainerSpec = 
mock(ContainerLaunchContext.class);
     ApplicationSubmissionContext context = ApplicationSubmissionContext
-        .newInstance(appId, MockApps.newAppName(), "q1",
+        .newInstance(appId, MockApps.newAppName(), "default",
             Priority.newInstance(0), amContainerSpec, false, false, -1,
             Resources.createResource(
                 YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
@@ -410,6 +414,102 @@ public class TestFederationClientInterceptor extends 
BaseRouterClientRMTest {
     }
   }
 
+  /**
+   * This test validates the correctness of
+   * GetApplicationAttemptReport in case the
+   * application exists in the cluster.
+   */
+  @Test
+  public void testGetApplicationAttemptReport()
+          throws YarnException, IOException, InterruptedException {
+    LOG.info("Test FederationClientInterceptor: " +
+            "Get ApplicationAttempt Report");
+
+    ApplicationId appId =
+            ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationAttemptId appAttemptId =
+            ApplicationAttemptId.newInstance(appId, 1);
+
+    SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
+
+    // Submit the application we want the applicationAttempt report later
+    SubmitApplicationResponse response = 
interceptor.submitApplication(request);
+
+    Assert.assertNotNull(response);
+    Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+    GetApplicationAttemptReportRequest requestGet =
+            GetApplicationAttemptReportRequest.newInstance(appAttemptId);
+
+    GetApplicationAttemptReportResponse responseGet =
+            interceptor.getApplicationAttemptReport(requestGet);
+
+    Assert.assertNotNull(responseGet);
+  }
+
+  /**
+   * This test validates the correctness of
+   * GetApplicationAttemptReport in case the
+   * application does not exist in StateStore.
+   */
+  @Test
+  public void testGetApplicationAttemptNotExists()
+          throws Exception {
+    LOG.info(
+            "Test ApplicationClientProtocol: " +
+                    "Get ApplicationAttempt Report - Not Exists");
+    ApplicationId appId =
+            ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationAttemptId appAttemptID =
+            ApplicationAttemptId.newInstance(appId, 1);
+    GetApplicationAttemptReportRequest requestGet =
+            GetApplicationAttemptReportRequest.newInstance(appAttemptID);
+
+    LambdaTestUtils.intercept(YarnException.class, "ApplicationAttempt " +
+            appAttemptID + "belongs to Application " +
+            appId + " does not exist in FederationStateStore",
+        () -> interceptor.getApplicationAttemptReport(requestGet));
+  }
+
+  /**
+   * This test validates
+   * the correctness of GetApplicationAttemptReport in case of
+   * empty request.
+   */
+  @Test
+  public void testGetApplicationAttemptEmptyRequest()
+          throws Exception {
+    LOG.info("Test FederationClientInterceptor: " +
+                    "Get ApplicationAttempt Report - Empty");
+
+    LambdaTestUtils.intercept(YarnException.class,
+            "Missing getApplicationAttemptReport " +
+                    "request or applicationId " +
+                    "or applicationAttemptId information.",
+        () -> interceptor.getApplicationAttemptReport(null));
+
+    LambdaTestUtils.intercept(YarnException.class,
+            "Missing getApplicationAttemptReport " +
+                    "request or applicationId " +
+                    "or applicationAttemptId information.",
+        () -> interceptor
+                    .getApplicationAttemptReport(
+                            GetApplicationAttemptReportRequest
+                                    .newInstance(null)));
+
+    LambdaTestUtils.intercept(YarnException.class,
+            "Missing getApplicationAttemptReport " +
+                    "request or applicationId " +
+                    "or applicationAttemptId information.",
+        () -> interceptor
+                    .getApplicationAttemptReport(
+                            GetApplicationAttemptReportRequest.newInstance(
+                                    ApplicationAttemptId
+                                            .newInstance(null, 1)
+                            )));
+  }
+
+
   @Test
   public void testGetClusterMetricsRequest() throws YarnException, IOException 
{
     LOG.info("Test FederationClientInterceptor : Get Cluster Metrics request");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to