This is an automated email from the ASF dual-hosted git repository.

anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4190e98bc6e HADOOP-19624. [ABFS] Fixing Thread leak in 
AbfsClientThrottlingAnalyzer (#7852)
4190e98bc6e is described below

commit 4190e98bc6ea4fd0f4a829f12a94d306445b5b3a
Author: mattkduran <[email protected]>
AuthorDate: Thu Jan 8 03:24:54 2026 -0800

    HADOOP-19624. [ABFS] Fixing Thread leak in AbfsClientThrottlingAnalyzer 
(#7852)
    
    Contributed by Matt Duran
    Reviewed by Anuj Modi, Anmol Asrani
    
    Signed off by Anuj Modi<[email protected]>
---
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |   3 +
 .../services/AbfsClientThrottlingAnalyzer.java     |  22 ++++-
 .../services/AbfsClientThrottlingIntercept.java    |  17 ++++
 .../services/AbfsNoOpThrottlingIntercept.java      |  12 +++
 .../azurebfs/services/AbfsThrottlingIntercept.java |   4 +-
 .../services/TestAbfsClientThrottlingAnalyzer.java | 103 +++++++++++++++++++++
 6 files changed, 158 insertions(+), 3 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 18e8183754d..2b21411c147 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -392,6 +392,9 @@ public void close() throws IOException {
     if (abfsApacheHttpClient != null) {
       abfsApacheHttpClient.close();
     }
+    if (intercept != null) {
+      IOUtils.cleanupWithLogger(LOG, intercept);
+    }
     if (tokenProvider instanceof Closeable) {
       IOUtils.cleanupWithLogger(LOG,
           (Closeable) tokenProvider);
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
index f1eb3a2a774..fbe0cfcced3 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.io.Closeable;
+import java.io.IOException;
+
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,7 +37,7 @@
 
 import static org.apache.hadoop.util.Time.now;
 
-class AbfsClientThrottlingAnalyzer {
+class AbfsClientThrottlingAnalyzer implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(
       AbfsClientThrottlingAnalyzer.class);
   private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
@@ -95,7 +98,7 @@ private AbfsClientThrottlingAnalyzer() {
   }
 
   /**
-   * Resumes the timer if it was stopped.
+   * Resumes the timer if it was stopped previously.
    */
   private void resumeTimer() {
     blobMetrics = new AtomicReference<AbfsOperationMetrics>(
@@ -172,6 +175,21 @@ public boolean suspendIfNecessary() {
     return false;
   }
 
+  /**
+ * Closes the throttling analyzer and releases associated resources.
+ * This method cancels the internal timer and cleans up any pending timer 
tasks.
+ * It is safe to call this method multiple times.
+ * @throws IOException if an I/O error occurs during cleanup
+ */
+@Override
+public void close() throws IOException {
+  if (timer != null) {
+    timer.cancel();
+    timer.purge();
+    timer = null;
+  }
+}
+
   @VisibleForTesting
   int getSleepDuration() {
     return sleepDuration;
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
index 39aaf34db0d..da39231f55d 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -223,4 +224,20 @@ private static long getContentLengthIfKnown(String range) {
     }
     return contentLength;
   }
+
+  /**
+   * Closes the throttling intercept and releases associated resources.
+   * This method closes both the read and write throttling analyzers.
+   *
+   * @throws IOException if an I/O error occurs during cleanup
+   */
+  @Override
+  public void close() throws IOException {
+    if (readThrottler != null) {
+      readThrottler.close();
+    }
+    if (writeThrottler != null) {
+      writeThrottler.close();
+    }
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java
index 58e50592997..ef6c74cef0d 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.io.IOException;
+
 /**
  * Implementation of {@link AbfsThrottlingIntercept} that does not throttle
  * the ABFS process.
@@ -40,4 +42,14 @@ public void updateMetrics(final AbfsRestOperationType 
operationType,
   public void sendingRequest(final AbfsRestOperationType operationType,
       final AbfsCounters abfsCounters) {
   }
+
+  /**
+   * No-op implementation of close method.
+   *
+   * @throws IOException if an I/O error occurs during cleanup
+   */
+  @Override
+  public void close() throws IOException {
+    // No resources to clean up in no-op implementation
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java
index 72537771464..5d516a41b7a 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.io.Closeable;
+import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -26,7 +28,7 @@
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public interface AbfsThrottlingIntercept {
+public interface AbfsThrottlingIntercept extends Closeable {
 
   /**
    * Updates the metrics for successful and failed read and write operations.
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
index 69dc0a607cb..69e6a587935 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
@@ -20,6 +20,10 @@
 
 import java.io.IOException;
 
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
@@ -180,4 +184,103 @@ public void testManySuccessAndErrorsAndWaiting() {
     sleep(10 * ANALYSIS_PERIOD);
     validate(0, analyzer.getSleepDuration());
   }
+
+  /**
+   * Test that timer thread is properly cleaned up when analyzer is closed.
+   * This validates the fix for HADOOP-19624.
+   */
+  @Test
+  public void testAnalyzerTimerCleanup() throws Exception {
+    int initialTimerThreads = countAbfsTimerThreads();
+
+    // Create analyzer - should create one timer thread
+    AbfsClientThrottlingAnalyzer analyzer =
+        new AbfsClientThrottlingAnalyzer("test-cleanup", abfsConfiguration);
+
+    // Verify timer thread was created
+    assertEquals(initialTimerThreads + 1, countAbfsTimerThreads(),
+        "Timer thread should be created");
+
+    // Close analyzer - should clean up timer thread
+    analyzer.close();
+
+    // Wait for cleanup to complete
+    sleep(1000);
+
+    // Verify timer thread was cleaned up
+    assertEquals(initialTimerThreads, countAbfsTimerThreads(),
+        "Timer thread should be cleaned up after close");
+  }
+
+  /**
+   * Test that close() is idempotent and can be called multiple times.
+   */
+  @Test
+  public void testAnalyzerCloseIdempotent() throws Exception {
+    AbfsClientThrottlingAnalyzer analyzer =
+        new AbfsClientThrottlingAnalyzer("test-idempotent", abfsConfiguration);
+
+    int beforeClose = countAbfsTimerThreads();
+
+    // Close multiple times - should not throw exceptions
+    analyzer.close();
+    analyzer.close();
+    analyzer.close();
+
+    sleep(500);
+
+    // Should only clean up once
+    assertTrue(countAbfsTimerThreads() < beforeClose,
+        "Multiple close() calls should be safe");
+  }
+
+  /**
+   * Test cleanup with multiple analyzers to ensure no interference.
+   */
+  @Test
+  public void testMultipleAnalyzersCleanup() throws Exception {
+    int initialTimerThreads = countAbfsTimerThreads();
+
+    // Create multiple analyzers
+    AbfsClientThrottlingAnalyzer analyzer1 =
+        new AbfsClientThrottlingAnalyzer("test-multi-1", abfsConfiguration);
+    AbfsClientThrottlingAnalyzer analyzer2 =
+        new AbfsClientThrottlingAnalyzer("test-multi-2", abfsConfiguration);
+    AbfsClientThrottlingAnalyzer analyzer3 =
+        new AbfsClientThrottlingAnalyzer("test-multi-3", abfsConfiguration);
+
+    // Should have created 3 timer threads
+    assertEquals(initialTimerThreads + 3, countAbfsTimerThreads(),
+        "Should create 3 timer threads");
+
+    // Close all analyzers
+    analyzer1.close();
+    analyzer2.close();
+    analyzer3.close();
+
+    sleep(1000);
+
+    // All timer threads should be cleaned up
+    assertEquals(initialTimerThreads, countAbfsTimerThreads(),
+        "All timer threads should be cleaned up");
+  }
+
+  /**
+   * Helper method to count ABFS timer threads.
+   */
+  private int countAbfsTimerThreads() {
+    java.lang.management.ThreadMXBean threadBean =
+        java.lang.management.ManagementFactory.getThreadMXBean();
+    long[] threadIds = threadBean.getAllThreadIds();
+
+    int count = 0;
+    for (long id : threadIds) {
+      java.lang.management.ThreadInfo info = threadBean.getThreadInfo(id);
+      if (info != null &&
+          
info.getThreadName().contains("abfs-timer-client-throttling-analyzer")) {
+        count++;
+      }
+    }
+    return count;
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to