This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 11af08d67a8 YARN-11489. Fix memory leak of DelegationTokenRenewer 
futures in DelegationTokenRenewerPoolTracker. (#5629). Contributed by Chun Chen.
11af08d67a8 is described below

commit 11af08d67a8f1006c0ff270d7b314d3801d280f3
Author: Chun Chen <chenchun.f...@gmail.com>
AuthorDate: Sun May 14 21:38:04 2023 +0800

    YARN-11489. Fix memory leak of DelegationTokenRenewer futures in 
DelegationTokenRenewerPoolTracker. (#5629). Contributed by Chun Chen.
    
    Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org>
---
 .../security/DelegationTokenRenewer.java           | 84 ++++++++++++++--------
 1 file changed, 56 insertions(+), 28 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
index 8040ce9771a..be95572f92a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
@@ -123,8 +123,8 @@ public class DelegationTokenRenewer extends AbstractService 
{
   private long tokenRenewerThreadTimeout;
   private long tokenRenewerThreadRetryInterval;
   private int tokenRenewerThreadRetryMaxAttempts;
-  private final Map<DelegationTokenRenewerEvent, Future<?>> futures =
-      new ConcurrentHashMap<>();
+  private final LinkedBlockingQueue<DelegationTokenRenewerFuture> futures =
+      new LinkedBlockingQueue<>();
   private boolean delegationTokenRenewerPoolTrackerFlag = true;
 
   // this config is supposedly not used by end-users.
@@ -227,7 +227,7 @@ public class DelegationTokenRenewer extends AbstractService 
{
       if (isServiceStarted) {
         Future<?> future =
             renewerService.submit(new DelegationTokenRenewerRunnable(evt));
-        futures.put(evt, future);
+        futures.add(new DelegationTokenRenewerFuture(evt, future));
       } else {
         pendingEventQueue.add(evt);
         int qSize = pendingEventQueue.size();
@@ -998,33 +998,35 @@ public class DelegationTokenRenewer extends 
AbstractService {
     @Override
     public void run() {
       while (true) {
-        for (Map.Entry<DelegationTokenRenewerEvent, Future<?>> entry : futures
-            .entrySet()) {
-          DelegationTokenRenewerEvent evt = entry.getKey();
-          Future<?> future = entry.getValue();
-          try {
-            future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS);
-          } catch (TimeoutException e) {
-
-            // Cancel thread and retry the same event in case of timeout
-            if (future != null && !future.isDone() && !future.isCancelled()) {
-              future.cancel(true);
-              futures.remove(evt);
-              if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) {
-                renewalTimer.schedule(
-                    getTimerTask((AbstractDelegationTokenRenewerAppEvent) evt),
-                    tokenRenewerThreadRetryInterval);
-              } else {
-                LOG.info(
-                    "Exhausted max retry attempts {} in token renewer "
-                        + "thread for {}",
-                    tokenRenewerThreadRetryMaxAttempts, 
evt.getApplicationId());
-              }
+        DelegationTokenRenewerFuture dtrf;
+        try {
+          dtrf = futures.take();
+        } catch (InterruptedException e) {
+          LOG.debug("DelegationTokenRenewer pool tracker interrupted");
+          return;
+        }
+        DelegationTokenRenewerEvent evt = dtrf.getEvt();
+        Future<?> future = dtrf.getFuture();
+        try {
+          future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          // Cancel thread and retry the same event in case of timeout.
+          if (!future.isDone() && !future.isCancelled()) {
+            future.cancel(true);
+            if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) {
+              renewalTimer.schedule(
+                  getTimerTask((AbstractDelegationTokenRenewerAppEvent) evt),
+                  tokenRenewerThreadRetryInterval);
+            } else {
+              LOG.info(
+                  "Exhausted max retry attempts {} in token renewer "
+                      + "thread for {}",
+                  tokenRenewerThreadRetryMaxAttempts, evt.getApplicationId());
             }
-          } catch (Exception e) {
-            LOG.info("Problem in submitting renew tasks in token renewer "
-                + "thread.", e);
           }
+        } catch (Exception e) {
+          LOG.info("Problem in submitting renew tasks in token renewer "
+              + "thread.", e);
         }
       }
     }
@@ -1192,6 +1194,32 @@ public class DelegationTokenRenewer extends 
AbstractService {
     }
   }
 
+  private static class DelegationTokenRenewerFuture {
+    private DelegationTokenRenewerEvent evt;
+    private Future<?> future;
+    DelegationTokenRenewerFuture(DelegationTokenRenewerEvent evt,
+        Future<?> future) {
+      this.future = future;
+      this.evt = evt;
+    }
+
+    public DelegationTokenRenewerEvent getEvt() {
+      return evt;
+    }
+
+    public void setEvt(DelegationTokenRenewerEvent evt) {
+      this.evt = evt;
+    }
+
+    public Future<?> getFuture() {
+      return future;
+    }
+
+    public void setFuture(Future<?> future) {
+      this.future = future;
+    }
+  }
+
   // only for testing
   protected ConcurrentMap<Token<?>, DelegationTokenToRenew> getAllTokens() {
     return allTokens;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to