This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f71fd885be4 YARN-11373. [Federation] Support refreshQueues 
refreshNodes API's for Federation. (#5146)
f71fd885be4 is described below

commit f71fd885be48bfa1f5bd0686519bed90a2fda561
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Wed Dec 7 00:17:05 2022 +0800

    YARN-11373. [Federation] Support refreshQueues refreshNodes API's for 
Federation. (#5146)
---
 .../api/protocolrecords/RefreshNodesRequest.java   | 25 ++++++++
 .../api/protocolrecords/RefreshQueuesRequest.java  | 17 +++++
 ...arn_server_resourcemanager_service_protos.proto |  2 +
 .../filecontroller/ifile/package-info.java         |  6 +-
 .../filecontroller/tfile/package-info.java         |  6 +-
 .../hadoop/yarn/security/admin/package-info.java   |  6 +-
 .../impl/pb/RefreshNodesRequestPBImpl.java         | 22 ++++++-
 .../impl/pb/RefreshQueuesRequestPBImpl.java        | 33 ++++++++--
 .../hadoop/yarn/server/router/RouterMetrics.java   | 38 ++++++++++-
 .../rmadmin/FederationRMAdminInterceptor.java      | 73 ++++++++++++++++++++--
 .../router/rmadmin/RMAdminProtocolMethod.java      | 62 ++++++++++++++++--
 .../rmadmin/TestFederationRMAdminInterceptor.java  | 70 ++++++++++++++++++++-
 12 files changed, 330 insertions(+), 30 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java
index fcbef039f27..1675e3ace42 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java
@@ -53,6 +53,17 @@ public abstract class RefreshNodesRequest {
     return request;
   }
 
+  @Private
+  @Unstable
+  public static RefreshNodesRequest newInstance(
+      DecommissionType decommissionType, Integer timeout, String subClusterId) 
{
+    RefreshNodesRequest request = Records.newRecord(RefreshNodesRequest.class);
+    request.setDecommissionType(decommissionType);
+    request.setDecommissionTimeout(timeout);
+    request.setSubClusterId(subClusterId);
+    return request;
+  }
+
   /**
    * Set the DecommissionType.
    * 
@@ -80,4 +91,18 @@ public abstract class RefreshNodesRequest {
    * @return decommissionTimeout
    */
   public abstract Integer getDecommissionTimeout();
+
+  /**
+   * Get the subClusterId.
+   *
+   * @return subClusterId.
+   */
+  public abstract String getSubClusterId();
+
+  /**
+   * Set the subClusterId.
+   *
+   * @param subClusterId subCluster Id.
+   */
+  public abstract void setSubClusterId(String subClusterId);
 }
\ No newline at end of file
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshQueuesRequest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshQueuesRequest.java
index eff4b7f4d28..ba332ad40cd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshQueuesRequest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshQueuesRequest.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -33,4 +34,20 @@ public abstract class RefreshQueuesRequest {
         Records.newRecord(RefreshQueuesRequest.class);
     return request;
   }
+
+  @Public
+  @Stable
+  public static RefreshQueuesRequest newInstance(String subClusterId) {
+    RefreshQueuesRequest request = 
Records.newRecord(RefreshQueuesRequest.class);
+    request.setSubClusterId(subClusterId);
+    return request;
+  }
+
+  @Public
+  @Unstable
+  public abstract String getSubClusterId();
+
+  @Private
+  @Unstable
+  public abstract void setSubClusterId(String subClusterId);
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
index 3f9913b9896..e1bf9edfccb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
@@ -32,6 +32,7 @@ package hadoop.yarn;
 import "yarn_protos.proto";
 
 message RefreshQueuesRequestProto {
+  optional string sub_cluster_id = 1;
 }
 message RefreshQueuesResponseProto {
 }
@@ -39,6 +40,7 @@ message RefreshQueuesResponseProto {
 message RefreshNodesRequestProto {
   optional DecommissionTypeProto decommissionType = 1 [default = NORMAL];
   optional int32 decommissionTimeout = 2;
+  optional string sub_cluster_id = 3;
 }
 message RefreshNodesResponseProto {
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java
index 08ddecef5db..9cbc99baad8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-@InterfaceAudience.Public
+@Public
 package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
-import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
 
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java
index b2e91ab48a9..e014350ec25 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,6 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-@InterfaceAudience.Public
+@Public
 package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
-import org.apache.hadoop.classification.InterfaceAudience;
\ No newline at end of file
+import org.apache.hadoop.classification.InterfaceAudience.Public;
\ No newline at end of file
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/admin/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/admin/package-info.java
index c66be222aea..99b857ac2ab 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/admin/package-info.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/admin/package-info.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-@InterfaceAudience.Public
+@Public
 package org.apache.hadoop.yarn.security.admin;
-import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
 
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java
index 62a82912b59..a14aae74f6b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java
@@ -31,9 +31,9 @@ import org.apache.hadoop.thirdparty.protobuf.TextFormat;
 @Private
 @Unstable
 public class RefreshNodesRequestPBImpl extends RefreshNodesRequest {
-  RefreshNodesRequestProto proto = 
RefreshNodesRequestProto.getDefaultInstance();
-  RefreshNodesRequestProto.Builder builder = null;
-  boolean viaProto = false;
+  private RefreshNodesRequestProto proto = 
RefreshNodesRequestProto.getDefaultInstance();
+  private RefreshNodesRequestProto.Builder builder = null;
+  private boolean viaProto = false;
   private DecommissionType decommissionType;
 
   public RefreshNodesRequestPBImpl() {
@@ -123,6 +123,22 @@ public class RefreshNodesRequestPBImpl extends 
RefreshNodesRequest {
     return p.hasDecommissionTimeout()? p.getDecommissionTimeout() : null;
   }
 
+  @Override
+  public synchronized String getSubClusterId() {
+    RefreshNodesRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasSubClusterId()) ? p.getSubClusterId() : null;
+  }
+
+  @Override
+  public synchronized void setSubClusterId(String subClusterId) {
+    maybeInitBuilder();
+    if (subClusterId == null) {
+      builder.clearSubClusterId();
+      return;
+    }
+    builder.setSubClusterId(subClusterId);
+  }
+
   private DecommissionType convertFromProtoFormat(DecommissionTypeProto p) {
     return DecommissionType.valueOf(p.name());
   }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshQueuesRequestPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshQueuesRequestPBImpl.java
index c21ec6d362c..2c174ad18fb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshQueuesRequestPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshQueuesRequestPBImpl.java
@@ -21,6 +21,7 @@ package 
org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import 
org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto;
+import 
org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
 
 import org.apache.hadoop.thirdparty.protobuf.TextFormat;
@@ -29,9 +30,9 @@ import org.apache.hadoop.thirdparty.protobuf.TextFormat;
 @Unstable
 public class RefreshQueuesRequestPBImpl extends RefreshQueuesRequest {
 
-  RefreshQueuesRequestProto proto = 
RefreshQueuesRequestProto.getDefaultInstance();
-  RefreshQueuesRequestProto.Builder builder = null;
-  boolean viaProto = false;
+  private RefreshQueuesRequestProto proto = 
RefreshQueuesRequestProto.getDefaultInstance();
+  private RefreshQueuesRequestProto.Builder builder = null;
+  private boolean viaProto = false;
   
   public RefreshQueuesRequestPBImpl() {
     builder = RefreshQueuesRequestProto.newBuilder();
@@ -55,8 +56,9 @@ public class RefreshQueuesRequestPBImpl extends 
RefreshQueuesRequest {
 
   @Override
   public boolean equals(Object other) {
-    if (other == null)
+    if (other == null) {
       return false;
+    }
     if (other.getClass().isAssignableFrom(this.getClass())) {
       return this.getProto().equals(this.getClass().cast(other).getProto());
     }
@@ -67,4 +69,27 @@ public class RefreshQueuesRequestPBImpl extends 
RefreshQueuesRequest {
   public String toString() {
     return TextFormat.shortDebugString(getProto());
   }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = RefreshQueuesRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public String getSubClusterId() {
+    RefreshQueuesRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasSubClusterId()) ? p.getSubClusterId() : null;
+  }
+
+  @Override
+  public void setSubClusterId(String clusterId) {
+    maybeInitBuilder();
+    if (clusterId == null) {
+      builder.clearSubClusterId();
+      return;
+    }
+    builder.setSubClusterId(clusterId);
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
index 31d838d1b3e..3268889c95c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
@@ -127,6 +127,8 @@ public final class RouterMetrics {
   private MutableGaugeInt numGetRMNodeLabelsFailedRetrieved;
   @Metric("# of checkUserAccessToQueue failed to be retrieved")
   private MutableGaugeInt numCheckUserAccessToQueueFailedRetrieved;
+  @Metric("# of refreshNodes failed to be retrieved")
+  private MutableGaugeInt numRefreshNodesFailedRetrieved;
   @Metric("# of getDelegationToken failed to be retrieved")
   private MutableGaugeInt numGetDelegationTokenFailedRetrieved;
   @Metric("# of renewDelegationToken failed to be retrieved")
@@ -221,6 +223,8 @@ public final class RouterMetrics {
   private MutableRate totalSucceededGetRMNodeLabelsRetrieved;
   @Metric("Total number of successful Retrieved CheckUserAccessToQueue and 
latency(ms)")
   private MutableRate totalSucceededCheckUserAccessToQueueRetrieved;
+  @Metric("Total number of successful Retrieved RefreshNodes and latency(ms)")
+  private MutableRate totalSucceededRefreshNodesRetrieved;
   @Metric("Total number of successful Retrieved GetDelegationToken and 
latency(ms)")
   private MutableRate totalSucceededGetDelegationTokenRetrieved;
   @Metric("Total number of successful Retrieved RenewDelegationToken and 
latency(ms)")
@@ -271,9 +275,10 @@ public final class RouterMetrics {
   private MutableQuantiles getUpdateQueueLatency;
   private MutableQuantiles getAppTimeoutLatency;
   private MutableQuantiles getAppTimeoutsLatency;
-  private MutableQuantiles getRefreshQueuesLatency;
+  private MutableQuantiles refreshQueuesLatency;
   private MutableQuantiles getRMNodeLabelsLatency;
   private MutableQuantiles checkUserAccessToQueueLatency;
+  private MutableQuantiles refreshNodesLatency;
   private MutableQuantiles getDelegationTokenLatency;
   private MutableQuantiles renewDelegationTokenLatency;
   private MutableQuantiles cancelDelegationTokenLatency;
@@ -430,7 +435,7 @@ public final class RouterMetrics {
     getAppTimeoutsLatency = registry.newQuantiles("getAppTimeoutsLatency",
          "latency of get apptimeouts timeouts", "ops", "latency", 10);
 
-    getRefreshQueuesLatency = registry.newQuantiles("getRefreshQueuesLatency",
+    refreshQueuesLatency = registry.newQuantiles("refreshQueuesLatency",
          "latency of get refresh queues timeouts", "ops", "latency", 10);
 
     getRMNodeLabelsLatency = registry.newQuantiles("getRMNodeLabelsLatency",
@@ -439,6 +444,9 @@ public final class RouterMetrics {
     checkUserAccessToQueueLatency = 
registry.newQuantiles("checkUserAccessToQueueLatency",
         "latency of get apptimeouts timeouts", "ops", "latency", 10);
 
+    refreshNodesLatency = registry.newQuantiles("refreshNodesLatency",
+        "latency of get refresh nodes timeouts", "ops", "latency", 10);
+
     getDelegationTokenLatency = 
registry.newQuantiles("getDelegationTokenLatency",
         "latency of get delegation token timeouts", "ops", "latency", 10);
 
@@ -447,6 +455,7 @@ public final class RouterMetrics {
 
     cancelDelegationTokenLatency = 
registry.newQuantiles("cancelDelegationTokenLatency",
         "latency of cancel delegation token timeouts", "ops", "latency", 10);
+
   }
 
   public static RouterMetrics getMetrics() {
@@ -673,6 +682,11 @@ public final class RouterMetrics {
     return totalSucceededRefreshQueuesRetrieved.lastStat().numSamples();
   }
 
+  @VisibleForTesting
+  public long getNumSucceededRefreshNodesRetrieved() {
+    return totalSucceededRefreshNodesRetrieved.lastStat().numSamples();
+  }
+
   @VisibleForTesting
   public long getNumSucceededGetRMNodeLabelsRetrieved() {
     return totalSucceededGetRMNodeLabelsRetrieved.lastStat().numSamples();
@@ -903,6 +917,11 @@ public final class RouterMetrics {
     return totalSucceededRefreshQueuesRetrieved.lastStat().mean();
   }
 
+  @VisibleForTesting
+  public double getLatencySucceededRefreshNodesRetrieved() {
+    return totalSucceededRefreshNodesRetrieved.lastStat().mean();
+  }
+
   @VisibleForTesting
   public double getLatencySucceededGetRMNodeLabelsRetrieved() {
     return totalSucceededGetRMNodeLabelsRetrieved.lastStat().mean();
@@ -1122,6 +1141,10 @@ public final class RouterMetrics {
     return numCheckUserAccessToQueueFailedRetrieved.value();
   }
 
+  public int getNumRefreshNodesFailedRetrieved() {
+    return numRefreshNodesFailedRetrieved.value();
+  }
+
   public int getDelegationTokenFailedRetrieved() {
     return numGetDelegationTokenFailedRetrieved.value();
   }
@@ -1336,7 +1359,12 @@ public final class RouterMetrics {
 
   public void succeededRefreshQueuesRetrieved(long duration) {
     totalSucceededRefreshQueuesRetrieved.add(duration);
-    getRefreshQueuesLatency.add(duration);
+    refreshQueuesLatency.add(duration);
+  }
+
+  public void succeededRefreshNodesRetrieved(long duration) {
+    totalSucceededRefreshNodesRetrieved.add(duration);
+    refreshNodesLatency.add(duration);
   }
 
   public void succeededGetRMNodeLabelsRetrieved(long duration) {
@@ -1536,6 +1564,10 @@ public final class RouterMetrics {
     numCheckUserAccessToQueueFailedRetrieved.incr();
   }
 
+  public void incrRefreshNodesFailedRetrieved() {
+    numRefreshNodesFailedRetrieved.incr();
+  }
+
   public void incrGetDelegationTokenFailedRetrieved() {
     numGetDelegationTokenFailedRetrieved.incr();
   }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
index 4564f8d8b85..22ace295c45 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
@@ -145,6 +145,23 @@ public class FederationRMAdminInterceptor extends 
AbstractRMAdminRequestIntercep
        + "is correct");
   }
 
+  /**
+   * Refresh queue requests.
+   *
+   * The Router supports refreshing all SubCluster queues at once,
+   * and also supports refreshing queues by SubCluster.
+   *
+   * @param request RefreshQueuesRequest, If subClusterId is not empty,
+   * it means that we want to refresh the queue of the specified subClusterId.
+   * If subClusterId is empty, it means we want to refresh all queues.
+   *
+   * @return RefreshQueuesResponse, There is no specific information in the 
response,
+   * as long as it is not empty, it means that the request is successful.
+   *
+   * @throws StandbyException exception thrown by non-active server.
+   * @throws YarnException indicates exceptions from yarn servers.
+   * @throws IOException io error occurs.
+   */
   @Override
   public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
       throws StandbyException, YarnException, IOException {
@@ -161,8 +178,9 @@ public class FederationRMAdminInterceptor extends 
AbstractRMAdminRequestIntercep
       RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
            new Class[] {RefreshQueuesRequest.class}, new Object[] {request});
 
+      String subClusterId = request.getSubClusterId();
       Collection<RefreshQueuesResponse> refreshQueueResps =
-          remoteMethod.invokeConcurrent(this, RefreshQueuesResponse.class);
+          remoteMethod.invokeConcurrent(this, RefreshQueuesResponse.class, 
subClusterId);
 
       // If we get the return result from refreshQueueResps,
       // it means that the call has been successful,
@@ -172,19 +190,66 @@ public class FederationRMAdminInterceptor extends 
AbstractRMAdminRequestIntercep
         routerMetrics.succeededRefreshQueuesRetrieved(stopTime - startTime);
         return RefreshQueuesResponse.newInstance();
       }
-    } catch (Exception e) {
+    } catch (YarnException e) {
       routerMetrics.incrRefreshQueuesFailedRetrieved();
-      RouterServerUtil.logAndThrowException("Unable to refreshQueue to 
exception.", e);
+      RouterServerUtil.logAndThrowException(e, "Unable to refreshQueue due to 
exception.");
     }
 
     routerMetrics.incrRefreshQueuesFailedRetrieved();
     throw new YarnException("Unable to refreshQueue.");
   }
 
+  /**
+   * Refresh node requests.
+   *
+   * The Router supports refreshing all SubCluster nodes at once,
+   * and also supports refreshing node by SubCluster.
+   *
+   * @param request RefreshNodesRequest, If subClusterId is not empty,
+   * it means that we want to refresh the node of the specified subClusterId.
+   * If subClusterId is empty, it means we want to refresh all nodes.
+   *
+   * @return RefreshNodesResponse, There is no specific information in the 
response,
+   * as long as it is not empty, it means that the request is successful.
+   *
+   * @throws StandbyException exception thrown by non-active server.
+   * @throws YarnException indicates exceptions from yarn servers.
+   * @throws IOException io error occurs.
+   */
   @Override
   public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
       throws StandbyException, YarnException, IOException {
-    throw new NotImplementedException();
+
+    // parameter verification.
+    // We will not check whether the DecommissionType is empty,
+    // because this parameter has a default value at the proto level.
+    if (request == null) {
+      routerMetrics.incrRefreshNodesFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Missing RefreshNodes request.", 
null);
+    }
+
+    // call refreshNodes of activeSubClusters.
+    try {
+      long startTime = clock.getTime();
+      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
+          new Class[] {RefreshNodesRequest.class}, new Object[] {request});
+
+      String subClusterId = request.getSubClusterId();
+      Collection<RefreshNodesResponse> refreshNodesResps =
+          remoteMethod.invokeConcurrent(this, RefreshNodesResponse.class, 
subClusterId);
+
+      if (CollectionUtils.isNotEmpty(refreshNodesResps)) {
+        long stopTime = clock.getTime();
+        routerMetrics.succeededRefreshNodesRetrieved(stopTime - startTime);
+        return RefreshNodesResponse.newInstance();
+      }
+    } catch (YarnException e) {
+      routerMetrics.incrRefreshNodesFailedRetrieved();
+      RouterServerUtil.logAndThrowException(e, "Unable to refreshNodes due to 
exception.");
+    }
+
+    routerMetrics.incrRefreshNodesFailedRetrieved();
+    throw new YarnException("Unable to refreshNodes.");
   }
 
   @Override
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminProtocolMethod.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminProtocolMethod.java
index e1aa806ff86..1a5b038f19c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminProtocolMethod.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminProtocolMethod.java
@@ -37,12 +37,12 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 
-
 /**
  * Class to define admin method, params and arguments.
  */
@@ -61,11 +61,15 @@ public class RMAdminProtocolMethod extends 
FederationMethodWrapper {
   }
 
   public <R> Collection<R> invokeConcurrent(FederationRMAdminInterceptor 
interceptor,
-      Class<R> clazz) throws YarnException {
+      Class<R> clazz, String subClusterId) throws YarnException {
     this.rmAdminInterceptor = interceptor;
     this.federationFacade = FederationStateStoreFacade.getInstance();
     this.configuration = interceptor.getConf();
-    return invokeConcurrent(clazz);
+    if (StringUtils.isNotBlank(subClusterId)) {
+      return invoke(clazz, subClusterId);
+    } else {
+      return invokeConcurrent(clazz);
+    }
   }
 
   @Override
@@ -107,7 +111,10 @@ public class RMAdminProtocolMethod extends 
FederationMethodWrapper {
           Pair<SubClusterId, Object> pair = future.get();
           subClusterId = pair.getKey();
           Object result = pair.getValue();
-          results.put(subClusterId, clazz.cast(result));
+          if (result != null) {
+            R rResult = clazz.cast(result);
+            results.put(subClusterId, rResult);
+          }
         } catch (InterruptedException | ExecutionException e) {
           Throwable cause = e.getCause();
           LOG.error("Cannot execute {} on {}: {}", methodName, subClusterId, 
cause.getMessage());
@@ -129,4 +136,51 @@ public class RMAdminProtocolMethod extends 
FederationMethodWrapper {
     // return result
     return results.values();
   }
+
+  /**
+   * Call the method in the protocol according to the subClusterId.
+   *
+   * @param clazz return type
+   * @param subClusterId subCluster Id
+   * @param <R> Generic R
+   * @return response collection.
+   * @throws YarnException yarn exception.
+   */
+  protected <R> Collection<R> invoke(Class<R> clazz, String subClusterId) 
throws YarnException {
+
+    // Get the method name to call
+    String methodName = 
Thread.currentThread().getStackTrace()[3].getMethodName();
+    this.setMethodName(methodName);
+
+    // Get Active SubClusters
+    Map<SubClusterId, SubClusterInfo> subClusterInfoMap =
+        federationFacade.getSubClusters(true);
+
+    // According to subCluster of string type, convert to SubClusterId type
+    SubClusterId subClusterIdKey = SubClusterId.newInstance(subClusterId);
+
+    // If the provided subCluster is not Active or does not exist,
+    // an exception will be returned directly.
+    if (!subClusterInfoMap.containsKey(subClusterIdKey)) {
+      throw new YarnException("subClusterId = " + subClusterId + " is not an 
active subCluster.");
+    }
+
+    // Call the method in the protocol and convert it according to clazz.
+    try {
+      ResourceManagerAdministrationProtocol protocol =
+          rmAdminInterceptor.getAdminRMProxyForSubCluster(subClusterIdKey);
+      Class<?>[] types = this.getTypes();
+      Object[] params = this.getParams();
+      Method method = 
ResourceManagerAdministrationProtocol.class.getMethod(methodName, types);
+      Object result = method.invoke(protocol, params);
+      if (result != null) {
+        return Collections.singletonList(clazz.cast(result));
+      }
+    } catch (Exception e) {
+      throw new YarnException("invoke Failed, An exception occurred in 
subClusterId = " +
+          subClusterId, e);
+    }
+    throw new YarnException("invoke Failed, An exception occurred in 
subClusterId = " +
+        subClusterId);
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java
index 3aa61a68a39..e68e9dda3ca 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java
@@ -19,8 +19,11 @@
 package org.apache.hadoop.yarn.server.router.rmadmin;
 
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.yarn.api.records.DecommissionType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -31,7 +34,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -77,7 +79,7 @@ public class TestFederationRMAdminInterceptor extends 
BaseRouterRMAdminTest {
     subClusters = new ArrayList<>();
     try {
       for (int i = 0; i < NUM_SUBCLUSTER; i++) {
-        SubClusterId sc = SubClusterId.newInstance(Integer.toString(i));
+        SubClusterId sc = SubClusterId.newInstance("SC-" + i);
         stateStoreUtil.registerSubCluster(sc);
         subClusters.add(sc);
       }
@@ -114,8 +116,70 @@ public class TestFederationRMAdminInterceptor extends 
BaseRouterRMAdminTest {
   }
 
   @Test
-  public void testRefreshQueues() throws IOException, YarnException {
+  public void testRefreshQueues() throws Exception {
+    // We will test 2 cases:
+    // case 1, request is null.
+    // case 2, normal request.
+    // If the request is null, a Missing RefreshQueues request exception will 
be thrown.
+
+    // null request.
+    LambdaTestUtils.intercept(YarnException.class, "Missing RefreshQueues 
request.",
+        () -> interceptor.refreshQueues(null));
+
+    // normal request.
     RefreshQueuesRequest request = RefreshQueuesRequest.newInstance();
     interceptor.refreshQueues(request);
   }
+
+  @Test
+  public void testSC1RefreshQueues() throws Exception {
+    // We will test 2 cases:
+    // case 1, test the existing subCluster (SC-1).
+    // case 2, test the non-exist subCluster.
+
+    String existSubCluster = "SC-1";
+    RefreshQueuesRequest request = 
RefreshQueuesRequest.newInstance(existSubCluster);
+    interceptor.refreshQueues(request);
+
+    String notExistsSubCluster = "SC-NON";
+    RefreshQueuesRequest request1 = 
RefreshQueuesRequest.newInstance(notExistsSubCluster);
+    LambdaTestUtils.intercept(YarnException.class,
+        "subClusterId = SC-NON is not an active subCluster.",
+        () -> interceptor.refreshQueues(request1));
+  }
+
+  @Test
+  public void testRefreshNodes() throws Exception {
+    // We will test 2 cases:
+    // case 1, request is null.
+    // case 2, normal request.
+    // If the request is null, a Missing RefreshNodes request exception will 
be thrown.
+
+    // null request.
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing RefreshNodes request.", () -> interceptor.refreshNodes(null));
+
+    // normal request.
+    RefreshNodesRequest request = 
RefreshNodesRequest.newInstance(DecommissionType.NORMAL);
+    interceptor.refreshNodes(request);
+  }
+
+  @Test
+  public void testSC1RefreshNodes() throws Exception {
+
+    // We will test 2 cases:
+    // case 1, test the existing subCluster (SC-1).
+    // case 2, test the non-exist subCluster.
+
+    RefreshNodesRequest request =
+        RefreshNodesRequest.newInstance(DecommissionType.NORMAL, 10, "SC-1");
+    interceptor.refreshNodes(request);
+
+    String notExistsSubCluster = "SC-NON";
+    RefreshNodesRequest request1 = RefreshNodesRequest.newInstance(
+        DecommissionType.NORMAL, 10, notExistsSubCluster);
+    LambdaTestUtils.intercept(YarnException.class,
+        "subClusterId = SC-NON is not an active subCluster.",
+        () -> interceptor.refreshNodes(request1));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org


Reply via email to