This is an automated email from the ASF dual-hosted git repository.
anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4190e98bc6e HADOOP-19624. [ABFS] Fixing Thread leak in
AbfsClientThrottlingAnalyzer (#7852)
4190e98bc6e is described below
commit 4190e98bc6ea4fd0f4a829f12a94d306445b5b3a
Author: mattkduran <[email protected]>
AuthorDate: Thu Jan 8 03:24:54 2026 -0800
HADOOP-19624. [ABFS] Fixing Thread leak in AbfsClientThrottlingAnalyzer
(#7852)
Contributed by Matt Duran
Reviewed by Anuj Modi, Anmol Asrani
Signed off by Anuj Modi<[email protected]>
---
.../hadoop/fs/azurebfs/services/AbfsClient.java | 3 +
.../services/AbfsClientThrottlingAnalyzer.java | 22 ++++-
.../services/AbfsClientThrottlingIntercept.java | 17 ++++
.../services/AbfsNoOpThrottlingIntercept.java | 12 +++
.../azurebfs/services/AbfsThrottlingIntercept.java | 4 +-
.../services/TestAbfsClientThrottlingAnalyzer.java | 103 +++++++++++++++++++++
6 files changed, 158 insertions(+), 3 deletions(-)
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 18e8183754d..2b21411c147 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -392,6 +392,9 @@ public void close() throws IOException {
if (abfsApacheHttpClient != null) {
abfsApacheHttpClient.close();
}
+ if (intercept != null) {
+ IOUtils.cleanupWithLogger(LOG, intercept);
+ }
if (tokenProvider instanceof Closeable) {
IOUtils.cleanupWithLogger(LOG,
(Closeable) tokenProvider);
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
index f1eb3a2a774..fbe0cfcced3 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.fs.azurebfs.services;
+import java.io.Closeable;
+import java.io.IOException;
+
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,7 +37,7 @@
import static org.apache.hadoop.util.Time.now;
-class AbfsClientThrottlingAnalyzer {
+class AbfsClientThrottlingAnalyzer implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsClientThrottlingAnalyzer.class);
private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
@@ -95,7 +98,7 @@ private AbfsClientThrottlingAnalyzer() {
}
/**
- * Resumes the timer if it was stopped.
+ * Resumes the timer if it was stopped previously.
*/
private void resumeTimer() {
blobMetrics = new AtomicReference<AbfsOperationMetrics>(
@@ -172,6 +175,21 @@ public boolean suspendIfNecessary() {
return false;
}
+ /**
+ * Closes the throttling analyzer and releases associated resources.
+ * This method cancels the internal timer and cleans up any pending timer
tasks.
+ * It is safe to call this method multiple times.
+ * @throws IOException if an I/O error occurs during cleanup
+ */
+@Override
+public void close() throws IOException {
+ if (timer != null) {
+ timer.cancel();
+ timer.purge();
+ timer = null;
+ }
+}
+
@VisibleForTesting
int getSleepDuration() {
return sleepDuration;
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
index 39aaf34db0d..da39231f55d 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.azurebfs.services;
+import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.concurrent.locks.ReentrantLock;
@@ -223,4 +224,20 @@ private static long getContentLengthIfKnown(String range) {
}
return contentLength;
}
+
+ /**
+ * Closes the throttling intercept and releases associated resources.
+ * This method closes both the read and write throttling analyzers.
+ *
+ * @throws IOException if an I/O error occurs during cleanup
+ */
+ @Override
+ public void close() throws IOException {
+ if (readThrottler != null) {
+ readThrottler.close();
+ }
+ if (writeThrottler != null) {
+ writeThrottler.close();
+ }
+ }
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java
index 58e50592997..ef6c74cef0d 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.fs.azurebfs.services;
+import java.io.IOException;
+
/**
* Implementation of {@link AbfsThrottlingIntercept} that does not throttle
* the ABFS process.
@@ -40,4 +42,14 @@ public void updateMetrics(final AbfsRestOperationType
operationType,
public void sendingRequest(final AbfsRestOperationType operationType,
final AbfsCounters abfsCounters) {
}
+
+ /**
+ * No-op implementation of close method.
+ *
+ * @throws IOException if an I/O error occurs during cleanup
+ */
+ @Override
+ public void close() throws IOException {
+ // No resources to clean up in no-op implementation
+ }
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java
index 72537771464..5d516a41b7a 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.fs.azurebfs.services;
+import java.io.Closeable;
+import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -26,7 +28,7 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public interface AbfsThrottlingIntercept {
+public interface AbfsThrottlingIntercept extends Closeable {
/**
* Updates the metrics for successful and failed read and write operations.
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
index 69dc0a607cb..69e6a587935 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
@@ -20,6 +20,10 @@
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.contract.ContractTestUtils;
@@ -180,4 +184,103 @@ public void testManySuccessAndErrorsAndWaiting() {
sleep(10 * ANALYSIS_PERIOD);
validate(0, analyzer.getSleepDuration());
}
+
+ /**
+ * Test that timer thread is properly cleaned up when analyzer is closed.
+ * This validates the fix for HADOOP-19624.
+ */
+ @Test
+ public void testAnalyzerTimerCleanup() throws Exception {
+ int initialTimerThreads = countAbfsTimerThreads();
+
+ // Create analyzer - should create one timer thread
+ AbfsClientThrottlingAnalyzer analyzer =
+ new AbfsClientThrottlingAnalyzer("test-cleanup", abfsConfiguration);
+
+ // Verify timer thread was created
+ assertEquals(initialTimerThreads + 1, countAbfsTimerThreads(),
+ "Timer thread should be created");
+
+ // Close analyzer - should clean up timer thread
+ analyzer.close();
+
+ // Wait for cleanup to complete
+ sleep(1000);
+
+ // Verify timer thread was cleaned up
+ assertEquals(initialTimerThreads, countAbfsTimerThreads(),
+ "Timer thread should be cleaned up after close");
+ }
+
+ /**
+ * Test that close() is idempotent and can be called multiple times.
+ */
+ @Test
+ public void testAnalyzerCloseIdempotent() throws Exception {
+ AbfsClientThrottlingAnalyzer analyzer =
+ new AbfsClientThrottlingAnalyzer("test-idempotent", abfsConfiguration);
+
+ int beforeClose = countAbfsTimerThreads();
+
+ // Close multiple times - should not throw exceptions
+ analyzer.close();
+ analyzer.close();
+ analyzer.close();
+
+ sleep(500);
+
+ // Should only clean up once
+ assertTrue(countAbfsTimerThreads() < beforeClose,
+ "Multiple close() calls should be safe");
+ }
+
+ /**
+ * Test cleanup with multiple analyzers to ensure no interference.
+ */
+ @Test
+ public void testMultipleAnalyzersCleanup() throws Exception {
+ int initialTimerThreads = countAbfsTimerThreads();
+
+ // Create multiple analyzers
+ AbfsClientThrottlingAnalyzer analyzer1 =
+ new AbfsClientThrottlingAnalyzer("test-multi-1", abfsConfiguration);
+ AbfsClientThrottlingAnalyzer analyzer2 =
+ new AbfsClientThrottlingAnalyzer("test-multi-2", abfsConfiguration);
+ AbfsClientThrottlingAnalyzer analyzer3 =
+ new AbfsClientThrottlingAnalyzer("test-multi-3", abfsConfiguration);
+
+ // Should have created 3 timer threads
+ assertEquals(initialTimerThreads + 3, countAbfsTimerThreads(),
+ "Should create 3 timer threads");
+
+ // Close all analyzers
+ analyzer1.close();
+ analyzer2.close();
+ analyzer3.close();
+
+ sleep(1000);
+
+ // All timer threads should be cleaned up
+ assertEquals(initialTimerThreads, countAbfsTimerThreads(),
+ "All timer threads should be cleaned up");
+ }
+
+ /**
+ * Helper method to count ABFS timer threads.
+ */
+ private int countAbfsTimerThreads() {
+ java.lang.management.ThreadMXBean threadBean =
+ java.lang.management.ManagementFactory.getThreadMXBean();
+ long[] threadIds = threadBean.getAllThreadIds();
+
+ int count = 0;
+ for (long id : threadIds) {
+ java.lang.management.ThreadInfo info = threadBean.getThreadInfo(id);
+ if (info != null &&
+
info.getThreadName().contains("abfs-timer-client-throttling-analyzer")) {
+ count++;
+ }
+ }
+ return count;
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]