This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d9a6792ca91f YARN-11011. Make YARN Router throw Exception to client 
clearly. (#6211) Contributed by Shilun Fan.
d9a6792ca91f is described below

commit d9a6792ca91f7e501459a2063117c1bd48a9bc42
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Wed Nov 8 10:07:54 2023 +0800

    YARN-11011. Make YARN Router throw Exception to client clearly. (#6211) 
Contributed by Shilun Fan.
    
    Reviewed-by: Inigo Goiri <inigo...@apache.org>
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../yarn/server/router/RouterServerUtil.java       | 12 ++++-
 .../clientrm/FederationClientInterceptor.java      | 40 ++++++++++-----
 .../TestFederationClientInterceptorRetry.java      | 57 ++++++++++++++++++++++
 .../TestableFederationClientInterceptor.java       |  7 +++
 4 files changed, 102 insertions(+), 14 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
index 096d8dac0a76..130503065d6b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
@@ -125,8 +125,9 @@ public final class RouterServerUtil {
   public static void logAndThrowException(String errMsg, Throwable t)
       throws YarnException {
     if (t != null) {
-      LOG.error(errMsg, t);
-      throw new YarnException(errMsg, t);
+      String newErrMsg = getErrorMsg(errMsg, t);
+      LOG.error(newErrMsg, t);
+      throw new YarnException(newErrMsg, t);
     } else {
       LOG.error(errMsg);
       throw new YarnException(errMsg);
@@ -146,6 +147,13 @@ public final class RouterServerUtil {
     throw new YarnException(errMsg);
   }
 
+  private static String getErrorMsg(String errMsg, Throwable t) {
+    if (t.getMessage() != null) {
+      return errMsg + "" + t.getMessage();
+    }
+    return errMsg;
+  }
+
   public static <R> R createRequestInterceptorChain(Configuration conf, String 
pipeLineClassName,
       String interceptorClassName, Class<R> clazz) {
 
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index 35b3e6eeb2bd..f9f08583ba59 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -24,13 +24,13 @@ import org.apache.hadoop.io.Text;
 import 
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
 import java.lang.reflect.Method;
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
-import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -842,13 +842,27 @@ public class FederationClientInterceptor
     // Generate parallel Callable tasks
     for (SubClusterId subClusterId : subClusterIds) {
       callables.add(() -> {
-        ApplicationClientProtocol protocol = 
getClientRMProxyForSubCluster(subClusterId);
-        String methodName = request.getMethodName();
-        Class<?>[] types = request.getTypes();
-        Object[] params = request.getParams();
-        Method method = ApplicationClientProtocol.class.getMethod(methodName, 
types);
-        Object result = method.invoke(protocol, params);
-        return Pair.of(subClusterId, result);
+        try {
+          ApplicationClientProtocol protocol = 
getClientRMProxyForSubCluster(subClusterId);
+          String methodName = request.getMethodName();
+          Class<?>[] types = request.getTypes();
+          Object[] params = request.getParams();
+          Method method = 
ApplicationClientProtocol.class.getMethod(methodName, types);
+          Object result = method.invoke(protocol, params);
+          return Pair.of(subClusterId, result);
+        } catch (Exception e) {
+          Throwable cause = e.getCause();
+          // We use Callable. If the exception thrown here is 
InvocationTargetException,
+          // it is a wrapped exception. We need to get the real cause of the 
error.
+          if (cause != null && cause instanceof InvocationTargetException) {
+            cause = cause.getCause();
+          }
+          String errMsg = (cause.getMessage() != null) ? cause.getMessage() : 
"UNKNOWN";
+          YarnException yarnException =
+              new YarnException(String.format("subClusterId %s exec %s error 
%s.",
+              subClusterId, request.getMethodName(), errMsg), e);
+          return Pair.of(subClusterId, yarnException);
+        }
       });
     }
 
@@ -862,8 +876,11 @@ public class FederationClientInterceptor
           Pair<SubClusterId, Object> pair = future.get();
           subClusterId = pair.getKey();
           Object result = pair.getValue();
+          if (result instanceof YarnException) {
+            throw YarnException.class.cast(result);
+          }
           results.put(subClusterId, clazz.cast(result));
-        } catch (InterruptedException | ExecutionException e) {
+        } catch (InterruptedException | ExecutionException | YarnException e) {
           Throwable cause = e.getCause();
           LOG.error("Cannot execute {} on {} : {}", request.getMethodName(),
               subClusterId.getId(), cause.getMessage());
@@ -877,9 +894,8 @@ public class FederationClientInterceptor
     // All sub-clusters return results to be considered successful,
     // otherwise an exception will be thrown.
     if (exceptions != null && !exceptions.isEmpty()) {
-      Set<SubClusterId> subClusterIdSets = exceptions.keySet();
-      throw new YarnException("invokeConcurrent Failed, An exception occurred 
in subClusterIds = " +
-          StringUtils.join(subClusterIdSets, ","));
+      throw new YarnException("invokeConcurrent Failed = " +
+          StringUtils.join(exceptions.values(), ","));
     }
 
     // return result
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java
index 25ca03353e1c..bf7ef7d17913 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java
@@ -34,6 +34,7 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -353,4 +354,60 @@ public class TestFederationClientInterceptorRetry
     SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster();
     Assert.assertEquals(expectSubCluster, respSubClusterId);
   }
+
+  @Test
+  public void testSubmitApplicationTwoBadNodeWithRealError() throws Exception {
+    LOG.info("Test submitApplication with two bad SubClusters.");
+    setupCluster(Arrays.asList(bad1, bad2));
+    interceptor.setNumSubmitRetries(1);
+
+    final ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 5);
+
+    final SubmitApplicationRequest request = 
mockSubmitApplicationRequest(appId);
+
+    LambdaTestUtils.intercept(YarnException.class, "RM is stopped",
+        () -> interceptor.submitApplication(request));
+  }
+
+  @Test
+  public void testSubmitApplicationOneBadNodeWithRealError() throws Exception {
+    LOG.info("Test submitApplication with one bad SubClusters.");
+    setupCluster(Arrays.asList(bad1));
+    interceptor.setNumSubmitRetries(0);
+
+    final ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 6);
+
+    final SubmitApplicationRequest request = 
mockSubmitApplicationRequest(appId);
+
+    LambdaTestUtils.intercept(YarnException.class, "RM is stopped",
+        () -> interceptor.submitApplication(request));
+  }
+
+  @Test
+  public void testGetClusterMetricsTwoBadNodeWithRealError() throws Exception {
+    LOG.info("Test getClusterMetrics with two bad SubClusters.");
+    setupCluster(Arrays.asList(bad1, bad2));
+    GetClusterMetricsRequest request = GetClusterMetricsRequest.newInstance();
+
+    LambdaTestUtils.intercept(YarnException.class,
+        "subClusterId 1 exec getClusterMetrics error RM is stopped.",
+        () -> interceptor.getClusterMetrics(request));
+
+    LambdaTestUtils.intercept(YarnException.class,
+        "subClusterId 2 exec getClusterMetrics error RM is stopped.",
+        () -> interceptor.getClusterMetrics(request));
+  }
+
+  @Test
+  public void testGetClusterMetricsOneBadNodeWithRealError() throws Exception {
+    LOG.info("Test getClusterMetrics with one bad SubClusters.");
+    setupCluster(Arrays.asList(bad1));
+    GetClusterMetricsRequest request = GetClusterMetricsRequest.newInstance();
+
+    LambdaTestUtils.intercept(YarnException.class,
+        "subClusterId 1 exec getClusterMetrics error RM is stopped.",
+        () -> interceptor.getClusterMetrics(request));
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
index 8e80bf2c0adb..9d6b87e8cc99 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
@@ -37,6 +37,8 @@ import 
org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
 import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.api.records.NodeAttributeType;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -126,6 +128,11 @@ public class TestableFederationClientInterceptor
       throw new ConnectException("RM is stopped");
     }
 
+    @Override
+    public GetClusterMetricsResponse 
getClusterMetrics(GetClusterMetricsRequest request)
+        throws YarnException {
+      throw new YarnException("RM is stopped");
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to