This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4af4997e112 YARN-11158. Support (Create/Renew/Cancel) DelegationToken 
API's for Federation. (#5104)
4af4997e112 is described below

commit 4af4997e1129d3b9bd7fde7ec8731d6e79093fd8
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Fri Dec 2 05:20:21 2022 +0800

    YARN-11158. Support (Create/Renew/Cancel) DelegationToken API's for 
Federation. (#5104)
---
 .../hadoop/yarn/server/router/RouterMetrics.java   |  97 ++++++++++++-
 .../yarn/server/router/RouterServerUtil.java       |  24 ++++
 .../clientrm/FederationClientInterceptor.java      |  97 ++++++++++++-
 .../yarn/server/router/TestRouterMetrics.java      |  80 ++++++++++-
 .../clientrm/TestFederationClientInterceptor.java  | 156 ++++++++++++++++++++-
 .../TestableFederationClientInterceptor.java       |  28 ++++
 6 files changed, 468 insertions(+), 14 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
index b03aeda38b4..31d838d1b3e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
@@ -127,6 +127,12 @@ public final class RouterMetrics {
   private MutableGaugeInt numGetRMNodeLabelsFailedRetrieved;
   @Metric("# of checkUserAccessToQueue failed to be retrieved")
   private MutableGaugeInt numCheckUserAccessToQueueFailedRetrieved;
+  @Metric("# of getDelegationToken failed to be retrieved")
+  private MutableGaugeInt numGetDelegationTokenFailedRetrieved;
+  @Metric("# of renewDelegationToken failed to be retrieved")
+  private MutableGaugeInt numRenewDelegationTokenFailedRetrieved;
+  @Metric("# of renewDelegationToken failed to be retrieved")
+  private MutableGaugeInt numCancelDelegationTokenFailedRetrieved;
 
   // Aggregate metrics are shared, and don't have to be looked up per call
   @Metric("Total number of successful Submitted apps and latency(ms)")
@@ -215,6 +221,12 @@ public final class RouterMetrics {
   private MutableRate totalSucceededGetRMNodeLabelsRetrieved;
   @Metric("Total number of successful Retrieved CheckUserAccessToQueue and 
latency(ms)")
   private MutableRate totalSucceededCheckUserAccessToQueueRetrieved;
+  @Metric("Total number of successful Retrieved GetDelegationToken and 
latency(ms)")
+  private MutableRate totalSucceededGetDelegationTokenRetrieved;
+  @Metric("Total number of successful Retrieved RenewDelegationToken and 
latency(ms)")
+  private MutableRate totalSucceededRenewDelegationTokenRetrieved;
+  @Metric("Total number of successful Retrieved CancelDelegationToken and 
latency(ms)")
+  private MutableRate totalSucceededCancelDelegationTokenRetrieved;
 
   /**
    * Provide quantile counters for all latencies.
@@ -262,6 +274,9 @@ public final class RouterMetrics {
   private MutableQuantiles getRefreshQueuesLatency;
   private MutableQuantiles getRMNodeLabelsLatency;
   private MutableQuantiles checkUserAccessToQueueLatency;
+  private MutableQuantiles getDelegationTokenLatency;
+  private MutableQuantiles renewDelegationTokenLatency;
+  private MutableQuantiles cancelDelegationTokenLatency;
 
   private static volatile RouterMetrics instance = null;
   private static MetricsRegistry registry;
@@ -423,6 +438,15 @@ public final class RouterMetrics {
 
     checkUserAccessToQueueLatency = 
registry.newQuantiles("checkUserAccessToQueueLatency",
         "latency of get apptimeouts timeouts", "ops", "latency", 10);
+
+    getDelegationTokenLatency = 
registry.newQuantiles("getDelegationTokenLatency",
+        "latency of get delegation token timeouts", "ops", "latency", 10);
+
+    renewDelegationTokenLatency = 
registry.newQuantiles("renewDelegationTokenLatency",
+       "latency of renew delegation token timeouts", "ops", "latency", 10);
+
+    cancelDelegationTokenLatency = 
registry.newQuantiles("cancelDelegationTokenLatency",
+        "latency of cancel delegation token timeouts", "ops", "latency", 10);
   }
 
   public static RouterMetrics getMetrics() {
@@ -655,10 +679,25 @@ public final class RouterMetrics {
   }
 
   @VisibleForTesting
-  public long getNumSucceededCheckUserAccessToQueueRetrievedRetrieved() {
+  public long getNumSucceededCheckUserAccessToQueueRetrieved() {
     return 
totalSucceededCheckUserAccessToQueueRetrieved.lastStat().numSamples();
   }
 
+  @VisibleForTesting
+  public long getNumSucceededGetDelegationTokenRetrieved() {
+    return totalSucceededGetDelegationTokenRetrieved.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public long getNumSucceededRenewDelegationTokenRetrieved() {
+    return totalSucceededRenewDelegationTokenRetrieved.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public long getNumSucceededCancelDelegationTokenRetrieved() {
+    return 
totalSucceededCancelDelegationTokenRetrieved.lastStat().numSamples();
+  }
+
   @VisibleForTesting
   public double getLatencySucceededAppsCreated() {
     return totalSucceededAppsCreated.lastStat().mean();
@@ -874,6 +913,21 @@ public final class RouterMetrics {
     return totalSucceededCheckUserAccessToQueueRetrieved.lastStat().mean();
   }
 
+  @VisibleForTesting
+  public double getLatencySucceededGetDelegationTokenRetrieved() {
+    return totalSucceededGetDelegationTokenRetrieved.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public double getLatencySucceededRenewDelegationTokenRetrieved() {
+    return totalSucceededRenewDelegationTokenRetrieved.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public double getLatencySucceededCancelDelegationTokenRetrieved() {
+    return totalSucceededCancelDelegationTokenRetrieved.lastStat().mean();
+  }
+
   @VisibleForTesting
   public int getAppsFailedCreated() {
     return numAppsFailedCreated.value();
@@ -1068,6 +1122,18 @@ public final class RouterMetrics {
     return numCheckUserAccessToQueueFailedRetrieved.value();
   }
 
+  public int getDelegationTokenFailedRetrieved() {
+    return numGetDelegationTokenFailedRetrieved.value();
+  }
+
+  public int getRenewDelegationTokenFailedRetrieved() {
+    return numRenewDelegationTokenFailedRetrieved.value();
+  }
+
+  public int getCancelDelegationTokenFailedRetrieved() {
+    return numCancelDelegationTokenFailedRetrieved.value();
+  }
+
   public void succeededAppsCreated(long duration) {
     totalSucceededAppsCreated.add(duration);
     getNewApplicationLatency.add(duration);
@@ -1283,6 +1349,21 @@ public final class RouterMetrics {
     checkUserAccessToQueueLatency.add(duration);
   }
 
+  public void succeededGetDelegationTokenRetrieved(long duration) {
+    totalSucceededGetDelegationTokenRetrieved.add(duration);
+    getDelegationTokenLatency.add(duration);
+  }
+
+  public void succeededRenewDelegationTokenRetrieved(long duration) {
+    totalSucceededRenewDelegationTokenRetrieved.add(duration);
+    renewDelegationTokenLatency.add(duration);
+  }
+
+  public void succeededCancelDelegationTokenRetrieved(long duration) {
+    totalSucceededCancelDelegationTokenRetrieved.add(duration);
+    cancelDelegationTokenLatency.add(duration);
+  }
+
   public void incrAppsFailedCreated() {
     numAppsFailedCreated.incr();
   }
@@ -1454,4 +1535,16 @@ public final class RouterMetrics {
   public void incrCheckUserAccessToQueueFailedRetrieved() {
     numCheckUserAccessToQueueFailedRetrieved.incr();
   }
-}
\ No newline at end of file
+
+  public void incrGetDelegationTokenFailedRetrieved() {
+    numGetDelegationTokenFailedRetrieved.incr();
+  }
+
+  public void incrRenewDelegationTokenFailedRetrieved() {
+    numRenewDelegationTokenFailedRetrieved.incr();
+  }
+
+  public void incrCancelDelegationTokenFailedRetrieved() {
+    numCancelDelegationTokenFailedRetrieved.incr();
+  }
+}
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
index 93818229dd1..8c880f25ddb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
@@ -24,10 +24,12 @@ import 
org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +38,7 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.EnumSet;
 import java.io.IOException;
 
 /**
@@ -470,6 +473,27 @@ public final class RouterServerUtil {
     }
   }
 
+  public static boolean isAllowedDelegationTokenOp() throws IOException {
+    if (UserGroupInformation.isSecurityEnabled()) {
+      return EnumSet.of(UserGroupInformation.AuthenticationMethod.KERBEROS,
+          UserGroupInformation.AuthenticationMethod.KERBEROS_SSL,
+          UserGroupInformation.AuthenticationMethod.CERTIFICATE)
+          .contains(UserGroupInformation.getCurrentUser()
+          .getRealAuthenticationMethod());
+    } else {
+      return true;
+    }
+  }
+
+  public static String getRenewerForToken(Token<RMDelegationTokenIdentifier> 
token)
+      throws IOException {
+    UserGroupInformation user = UserGroupInformation.getCurrentUser();
+    UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+    // we can always renew our own tokens
+    return loginUser.getUserName().equals(user.getUserName())
+        ? token.decodeIdentifier().getRenewer().toString() : 
user.getShortUserName();
+  }
+
   public static UserGroupInformation setupUser(final String userName) {
     UserGroupInformation user = null;
     try {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index cf457c70771..a50ea5bc423 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.router.clientrm;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.io.Text;
 import 
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
 import java.lang.reflect.Method;
@@ -40,7 +41,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -118,9 +118,13 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRespo
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import 
org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
@@ -136,6 +140,7 @@ import org.apache.hadoop.yarn.server.router.RouterMetrics;
 import org.apache.hadoop.yarn.server.router.RouterServerUtil;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.MonotonicClock;
+import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1392,19 +1397,103 @@ public class FederationClientInterceptor
   @Override
   public GetDelegationTokenResponse getDelegationToken(
       GetDelegationTokenRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null || request.getRenewer() == null) {
+      routerMetrics.incrGetDelegationTokenFailedRetrieved();
+      RouterServerUtil.logAndThrowException(
+          "Missing getDelegationToken request or Renewer.", null);
+    }
+
+    try {
+      // Verify that the connection is kerberos authenticated
+      if (!RouterServerUtil.isAllowedDelegationTokenOp()) {
+        routerMetrics.incrGetDelegationTokenFailedRetrieved();
+        throw new IOException(
+            "Delegation Token can be issued only with kerberos 
authentication.");
+      }
+
+      long startTime = clock.getTime();
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      Text owner = new Text(ugi.getUserName());
+      Text realUser = null;
+      if (ugi.getRealUser() != null) {
+        realUser = new Text(ugi.getRealUser().getUserName());
+      }
+
+      RMDelegationTokenIdentifier tokenIdentifier =
+          new RMDelegationTokenIdentifier(owner, new 
Text(request.getRenewer()), realUser);
+      Token<RMDelegationTokenIdentifier> realRMDToken =
+          new Token<>(tokenIdentifier, this.getTokenSecretManager());
+
+      org.apache.hadoop.yarn.api.records.Token routerRMDTToken =
+          BuilderUtils.newDelegationToken(realRMDToken.getIdentifier(),
+              realRMDToken.getKind().toString(),
+              realRMDToken.getPassword(), 
realRMDToken.getService().toString());
+
+      long stopTime = clock.getTime();
+      routerMetrics.succeededGetDelegationTokenRetrieved((stopTime - 
startTime));
+      return GetDelegationTokenResponse.newInstance(routerRMDTToken);
+    } catch(IOException e) {
+      routerMetrics.incrGetDelegationTokenFailedRetrieved();
+      throw new YarnException(e);
+    }
   }
 
   @Override
   public RenewDelegationTokenResponse renewDelegationToken(
       RenewDelegationTokenRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+    try {
+
+      if (!RouterServerUtil.isAllowedDelegationTokenOp()) {
+        routerMetrics.incrRenewDelegationTokenFailedRetrieved();
+        throw new IOException(
+            "Delegation Token can be renewed only with kerberos 
authentication");
+      }
+
+      long startTime = clock.getTime();
+      org.apache.hadoop.yarn.api.records.Token protoToken = 
request.getDelegationToken();
+      Token<RMDelegationTokenIdentifier> token = new Token<>(
+          protoToken.getIdentifier().array(), protoToken.getPassword().array(),
+          new Text(protoToken.getKind()), new Text(protoToken.getService()));
+      String user = RouterServerUtil.getRenewerForToken(token);
+      long nextExpTime = this.getTokenSecretManager().renewToken(token, user);
+      RenewDelegationTokenResponse renewResponse =
+          Records.newRecord(RenewDelegationTokenResponse.class);
+      renewResponse.setNextExpirationTime(nextExpTime);
+      long stopTime = clock.getTime();
+      routerMetrics.succeededRenewDelegationTokenRetrieved((stopTime - 
startTime));
+      return renewResponse;
+
+    } catch (IOException e) {
+      routerMetrics.incrRenewDelegationTokenFailedRetrieved();
+      throw new YarnException(e);
+    }
   }
 
   @Override
   public CancelDelegationTokenResponse cancelDelegationToken(
       CancelDelegationTokenRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+    try {
+      if (!RouterServerUtil.isAllowedDelegationTokenOp()) {
+        routerMetrics.incrCancelDelegationTokenFailedRetrieved();
+        throw new IOException(
+            "Delegation Token can be cancelled only with kerberos 
authentication");
+      }
+
+      long startTime = clock.getTime();
+      org.apache.hadoop.yarn.api.records.Token protoToken = 
request.getDelegationToken();
+      Token<RMDelegationTokenIdentifier> token = new Token<>(
+          protoToken.getIdentifier().array(), protoToken.getPassword().array(),
+          new Text(protoToken.getKind()), new Text(protoToken.getService()));
+      String user = UserGroupInformation.getCurrentUser().getUserName();
+      this.getTokenSecretManager().cancelToken(token, user);
+      long stopTime = clock.getTime();
+      routerMetrics.succeededCancelDelegationTokenRetrieved((stopTime - 
startTime));
+      return Records.newRecord(CancelDelegationTokenResponse.class);
+    } catch (IOException e) {
+      routerMetrics.incrCancelDelegationTokenFailedRetrieved();
+      throw new YarnException(e);
+    }
   }
 
   @Override
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
index 828e5c69f35..9d5aeab5c6d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
@@ -519,10 +519,20 @@ public class TestRouterMetrics {
       metrics.incrGetRMNodeLabelsFailedRetrieved();
     }
 
-    public void getCheckUserAccessToQueueRetrieved() {
-      LOG.info("Mocked: failed checkUserAccessToQueueRetrieved call");
+    public void getCheckUserAccessToQueueFailed() {
+      LOG.info("Mocked: failed checkUserAccessToQueue call");
       metrics.incrCheckUserAccessToQueueFailedRetrieved();
     }
+
+    public void getDelegationTokenFailed() {
+      LOG.info("Mocked: failed getDelegationToken call");
+      metrics.incrGetDelegationTokenFailedRetrieved();
+    }
+
+    public void getRenewDelegationTokenFailed() {
+      LOG.info("Mocked: failed renewDelegationToken call");
+      metrics.incrRenewDelegationTokenFailedRetrieved();
+    }
   }
 
   // Records successes for all calls
@@ -743,6 +753,16 @@ public class TestRouterMetrics {
       LOG.info("Mocked: successful CheckUserAccessToQueue call with duration 
{}", duration);
       metrics.succeededCheckUserAccessToQueueRetrieved(duration);
     }
+
+    public void getGetDelegationTokenRetrieved(long duration) {
+      LOG.info("Mocked: successful GetDelegationToken call with duration {}", 
duration);
+      metrics.succeededGetDelegationTokenRetrieved(duration);
+    }
+
+    public void getRenewDelegationTokenRetrieved(long duration) {
+      LOG.info("Mocked: successful RenewDelegationToken call with duration 
{}", duration);
+      metrics.succeededRenewDelegationTokenRetrieved(duration);
+    }
   }
 
   @Test
@@ -1510,16 +1530,16 @@ public class TestRouterMetrics {
   }
 
   @Test
-  public void testCheckUserAccessToQueueRetrievedRetrieved() {
-    long totalGoodBefore = 
metrics.getNumSucceededCheckUserAccessToQueueRetrievedRetrieved();
+  public void testCheckUserAccessToQueueRetrieved() {
+    long totalGoodBefore = 
metrics.getNumSucceededCheckUserAccessToQueueRetrieved();
     goodSubCluster.getCheckUserAccessToQueueRetrieved(150);
     Assert.assertEquals(totalGoodBefore + 1,
-        metrics.getNumSucceededCheckUserAccessToQueueRetrievedRetrieved());
+        metrics.getNumSucceededCheckUserAccessToQueueRetrieved());
     Assert.assertEquals(150,
         metrics.getLatencySucceededCheckUserAccessToQueueRetrieved(), 
ASSERT_DOUBLE_DELTA);
     goodSubCluster.getCheckUserAccessToQueueRetrieved(300);
     Assert.assertEquals(totalGoodBefore + 2,
-        metrics.getNumSucceededCheckUserAccessToQueueRetrievedRetrieved());
+        metrics.getNumSucceededCheckUserAccessToQueueRetrieved());
     Assert.assertEquals(225,
         metrics.getLatencySucceededCheckUserAccessToQueueRetrieved(), 
ASSERT_DOUBLE_DELTA);
   }
@@ -1527,8 +1547,54 @@ public class TestRouterMetrics {
   @Test
   public void testCheckUserAccessToQueueRetrievedFailed() {
     long totalBadBefore = metrics.getCheckUserAccessToQueueFailedRetrieved();
-    badSubCluster.getCheckUserAccessToQueueRetrieved();
+    badSubCluster.getCheckUserAccessToQueueFailed();
     Assert.assertEquals(totalBadBefore + 1,
         metrics.getCheckUserAccessToQueueFailedRetrieved());
   }
+
+  @Test
+  public void testGetDelegationTokenRetrieved() {
+    long totalGoodBefore = 
metrics.getNumSucceededGetDelegationTokenRetrieved();
+    goodSubCluster.getGetDelegationTokenRetrieved(150);
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededGetDelegationTokenRetrieved());
+    Assert.assertEquals(150,
+        metrics.getLatencySucceededGetDelegationTokenRetrieved(), 
ASSERT_DOUBLE_DELTA);
+    goodSubCluster.getGetDelegationTokenRetrieved(300);
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededGetDelegationTokenRetrieved());
+    Assert.assertEquals(225,
+        metrics.getLatencySucceededGetDelegationTokenRetrieved(), 
ASSERT_DOUBLE_DELTA);
+  }
+
+  @Test
+  public void testGetDelegationTokenRetrievedFailed() {
+    long totalBadBefore = metrics.getDelegationTokenFailedRetrieved();
+    badSubCluster.getDelegationTokenFailed();
+    Assert.assertEquals(totalBadBefore + 1,
+        metrics.getDelegationTokenFailedRetrieved());
+  }
+
+  @Test
+  public void testRenewDelegationTokenRetrieved() {
+    long totalGoodBefore = 
metrics.getNumSucceededRenewDelegationTokenRetrieved();
+    goodSubCluster.getRenewDelegationTokenRetrieved(150);
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededRenewDelegationTokenRetrieved());
+    Assert.assertEquals(150,
+        metrics.getLatencySucceededRenewDelegationTokenRetrieved(), 
ASSERT_DOUBLE_DELTA);
+    goodSubCluster.getRenewDelegationTokenRetrieved(300);
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededRenewDelegationTokenRetrieved());
+    Assert.assertEquals(225,
+        metrics.getLatencySucceededRenewDelegationTokenRetrieved(), 
ASSERT_DOUBLE_DELTA);
+  }
+
+  @Test
+  public void testRenewDelegationTokenRetrievedFailed() {
+    long totalBadBefore = metrics.getRenewDelegationTokenFailedRetrieved();
+    badSubCluster.getRenewDelegationTokenFailed();
+    Assert.assertEquals(totalBadBefore + 1,
+        metrics.getRenewDelegationTokenFailedRetrieved());
+  }
 }
\ No newline at end of file
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
index 38f571c288b..2488fc73b07 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.router.clientrm;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -32,6 +33,7 @@ import java.util.stream.Collectors;
 import java.util.Arrays;
 import java.util.Collection;
 
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.Time;
@@ -100,6 +102,12 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -123,10 +131,13 @@ import 
org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
 import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import 
org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
 import 
org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@@ -138,6 +149,9 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSyst
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import 
org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
@@ -170,7 +184,7 @@ public class TestFederationClientInterceptor extends 
BaseRouterClientRMTest {
   private final static long DEFAULT_DURATION = 10 * 60 * 1000;
 
   @Override
-  public void setUp() {
+  public void setUp() throws IOException {
     super.setUpConfig();
     interceptor = new TestableFederationClientInterceptor();
 
@@ -181,6 +195,11 @@ public class TestFederationClientInterceptor extends 
BaseRouterClientRMTest {
 
     interceptor.setConf(this.getConf());
     interceptor.init(user);
+    RouterDelegationTokenSecretManager tokenSecretManager =
+        interceptor.createRouterRMDelegationTokenSecretManager(this.getConf());
+
+    tokenSecretManager.startThreads();
+    interceptor.setTokenSecretManager(tokenSecretManager);
 
     subClusters = new ArrayList<>();
 
@@ -230,6 +249,7 @@ public class TestFederationClientInterceptor extends 
BaseRouterClientRMTest {
     conf.setInt("yarn.scheduler.maximum-allocation-mb", 100 * 1024);
     conf.setInt("yarn.scheduler.maximum-allocation-vcores", 100);
 
+    conf.setBoolean("hadoop.security.authentication", true);
     return conf;
   }
 
@@ -1550,4 +1570,138 @@ public class TestFederationClientInterceptor extends 
BaseRouterClientRMTest {
     int minThreads2 = interceptor.getNumMaxThreads(this.getConf());
     Assert.assertEquals(8, minThreads2);
   }
+
+  @Test
+  public void testGetDelegationToken() throws IOException, YarnException {
+
+    // We design such a unit test to check
+    // that the execution of the GetDelegationToken method is as expected.
+    //
+    // 1. Apply for a DelegationToken for renewer1,
+    // the Router returns the DelegationToken of the user, and the KIND of the 
token is
+    // RM_DELEGATION_TOKEN
+    //
+    // 2. We maintain the compatibility with RMDelegationTokenIdentifier,
+    // we can serialize the token into RMDelegationTokenIdentifier.
+    //
+    // 3. We can get the issueDate, and compare the data in the StateStore,
+    // the data should be consistent.
+
+    // Step1. We apply for DelegationToken for renewer1
+    // Both response & delegationToken cannot be empty
+    GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class);
+    when(request.getRenewer()).thenReturn("renewer1");
+    GetDelegationTokenResponse response = 
interceptor.getDelegationToken(request);
+    Assert.assertNotNull(response);
+    Token delegationToken = response.getRMDelegationToken();
+    Assert.assertNotNull(delegationToken);
+    Assert.assertEquals("RM_DELEGATION_TOKEN", delegationToken.getKind());
+
+    // Step2. Serialize the returned Token as RMDelegationTokenIdentifier.
+    org.apache.hadoop.security.token.Token<RMDelegationTokenIdentifier> token =
+        ConverterUtils.convertFromYarn(delegationToken, (Text) null);
+    RMDelegationTokenIdentifier rMDelegationTokenIdentifier = 
token.decodeIdentifier();
+    Assert.assertNotNull(rMDelegationTokenIdentifier);
+
+    // Step3. Verify the returned data of the token.
+    String renewer = rMDelegationTokenIdentifier.getRenewer().toString();
+    long issueDate = rMDelegationTokenIdentifier.getIssueDate();
+    long maxDate = rMDelegationTokenIdentifier.getMaxDate();
+    Assert.assertEquals("renewer1", renewer);
+
+    long tokenMaxLifetime = this.getConf().getLong(
+        YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY,
+        YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
+    Assert.assertEquals(issueDate + tokenMaxLifetime, maxDate);
+
+    RouterRMDTSecretManagerState managerState = 
stateStore.getRouterRMSecretManagerState();
+    Assert.assertNotNull(managerState);
+
+    Map<RMDelegationTokenIdentifier, Long> delegationTokenState = 
managerState.getTokenState();
+    Assert.assertNotNull(delegationTokenState);
+    
Assert.assertTrue(delegationTokenState.containsKey(rMDelegationTokenIdentifier));
+
+    long tokenRenewInterval = this.getConf().getLong(
+        YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+        YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+    long renewDate = delegationTokenState.get(rMDelegationTokenIdentifier);
+    Assert.assertEquals(issueDate + tokenRenewInterval, renewDate);
+  }
+
+  @Test
+  public void testRenewDelegationToken() throws IOException, YarnException {
+
+    // We design such a unit test to check
+    // that the execution of the GetDelegationToken method is as expected
+    // 1. Call GetDelegationToken to apply for delegationToken.
+    // 2. Call renewDelegationToken to refresh delegationToken.
+    // By looking at the code of 
AbstractDelegationTokenSecretManager#renewToken,
+    // we know that renewTime is calculated as Math.min(id.getMaxDate(), now + 
tokenRenewInterval)
+    // so renewTime will be less than or equal to maxDate.
+    // 3. We will compare whether the expirationTime returned to the
+    // client is consistent with the renewDate in the stateStore.
+
+    // Step1. Call GetDelegationToken to apply for delegationToken.
+    GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class);
+    when(request.getRenewer()).thenReturn("renewer2");
+    GetDelegationTokenResponse response = 
interceptor.getDelegationToken(request);
+    Assert.assertNotNull(response);
+    Token delegationToken = response.getRMDelegationToken();
+
+    org.apache.hadoop.security.token.Token<RMDelegationTokenIdentifier> token =
+        ConverterUtils.convertFromYarn(delegationToken, (Text) null);
+    RMDelegationTokenIdentifier rMDelegationTokenIdentifier = 
token.decodeIdentifier();
+    String renewer = rMDelegationTokenIdentifier.getRenewer().toString();
+    long maxDate = rMDelegationTokenIdentifier.getMaxDate();
+    Assert.assertEquals("renewer2", renewer);
+
+    // Step2. Call renewDelegationToken to refresh delegationToken.
+    RenewDelegationTokenRequest renewRequest = 
Records.newRecord(RenewDelegationTokenRequest.class);
+    renewRequest.setDelegationToken(delegationToken);
+    RenewDelegationTokenResponse renewResponse = 
interceptor.renewDelegationToken(renewRequest);
+    Assert.assertNotNull(renewResponse);
+
+    long expDate = renewResponse.getNextExpirationTime();
+    Assert.assertTrue(expDate <= maxDate);
+
+    // Step3. Compare whether the expirationTime returned to
+    // the client is consistent with the renewDate in the stateStore
+    RouterRMDTSecretManagerState managerState = 
stateStore.getRouterRMSecretManagerState();
+    Map<RMDelegationTokenIdentifier, Long> delegationTokenState = 
managerState.getTokenState();
+    Assert.assertNotNull(delegationTokenState);
+    
Assert.assertTrue(delegationTokenState.containsKey(rMDelegationTokenIdentifier));
+    long renewDate = delegationTokenState.get(rMDelegationTokenIdentifier);
+    Assert.assertEquals(expDate, renewDate);
+  }
+
+  @Test
+  public void testCancelDelegationToken() throws IOException, YarnException {
+
+    // We design such a unit test to check
+    // that the execution of the CancelDelegationToken method is as expected
+    // 1. Call GetDelegationToken to apply for delegationToken.
+    // 2. Call CancelDelegationToken to cancel delegationToken.
+    // 3. Query the data in the StateStore and confirm that the Delegation has 
been deleted.
+
+    // Step1. Call GetDelegationToken to apply for delegationToken.
+    GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class);
+    when(request.getRenewer()).thenReturn("renewer3");
+    GetDelegationTokenResponse response = 
interceptor.getDelegationToken(request);
+    Assert.assertNotNull(response);
+    Token delegationToken = response.getRMDelegationToken();
+
+    // Step2. Call CancelDelegationToken to cancel delegationToken.
+    CancelDelegationTokenRequest cancelTokenRequest =
+        CancelDelegationTokenRequest.newInstance(delegationToken);
+    CancelDelegationTokenResponse cancelTokenResponse =
+        interceptor.cancelDelegationToken(cancelTokenRequest);
+    Assert.assertNotNull(cancelTokenResponse);
+
+    // Step3. Query the data in the StateStore and confirm that the Delegation 
has been deleted.
+    // At this point, the size of delegationTokenState should be 0.
+    RouterRMDTSecretManagerState managerState = 
stateStore.getRouterRMSecretManagerState();
+    Map<RMDelegationTokenIdentifier, Long> delegationTokenState = 
managerState.getTokenState();
+    Assert.assertNotNull(delegationTokenState);
+    Assert.assertEquals(0, delegationTokenState.size());
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
index 8279899e387..c8c647a0d22 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
@@ -28,8 +28,10 @@ import java.util.Set;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
@@ -38,6 +40,7 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.api.records.NodeAttributeType;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -51,6 +54,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSyst
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import 
org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.junit.Assert;
 import org.slf4j.Logger;
@@ -216,4 +220,28 @@ public class TestableFederationClientInterceptor
     mockRMs.clear();
     super.shutdown();
   }
+
+  public RouterDelegationTokenSecretManager 
createRouterRMDelegationTokenSecretManager(
+      Configuration conf) {
+
+    long secretKeyInterval = conf.getLong(
+        YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY,
+        YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
+
+    long tokenMaxLifetime = conf.getLong(
+        YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY,
+        YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
+
+    long tokenRenewInterval = conf.getLong(
+        YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+        YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+
+    long removeScanInterval = conf.getTimeDuration(
+        YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_KEY,
+        YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    return new RouterDelegationTokenSecretManager(secretKeyInterval,
+        tokenMaxLifetime, tokenRenewInterval, removeScanInterval);
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org


Reply via email to