This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 200a2430458 HDDS-13939. Potential channel leak in
StreamingClient.stream() method (#9304)
200a2430458 is described below
commit 200a24304583d369faeb91325afdcd5ba4c6e99a
Author: GUAN-HAO HUANG <[email protected]>
AuthorDate: Tue Nov 18 00:03:43 2025 +0800
HDDS-13939. Potential channel leak in StreamingClient.stream() method
(#9304)
---
.../ozone/container/stream/StreamingClient.java | 18 +++++++---
.../container/stream/TestStreamingServer.java | 38 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 4 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java
index 55233e749bc..d4aff71b54d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java
@@ -86,11 +86,17 @@ public void stream(String id) {
}
public void stream(String id, long timeout, TimeUnit unit) {
+ Channel channel = null;
try {
- Channel channel = bootstrap.connect(host, port).sync().channel();
- channel.writeAndFlush(id + "\n")
- .await(timeout, unit);
- channel.closeFuture().await(timeout, unit);
+ channel = bootstrap.connect(host, port).sync().channel();
+ boolean writeSuccess = channel.writeAndFlush(id + "\n").await(timeout,
unit);
+ if (!writeSuccess) {
+ throw new StreamingException("Failed to write id " + id + ": timed out
" + timeout + " " + unit);
+ }
+ boolean closeSuccess = channel.closeFuture().await(timeout, unit);
+ if (!closeSuccess) {
+ throw new StreamingException("Failed to close channel for id " + id +
": timed out " + timeout + " " + unit);
+ }
if (!dirstreamClientHandler.isAtTheEnd()) {
throw new StreamingException("Streaming is failed. Not all files " +
"are streamed. Please check the log of the server." +
@@ -100,6 +106,10 @@ public void stream(String id, long timeout, TimeUnit unit)
{
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StreamingException(e);
+ } finally {
+ if (channel != null && channel.isActive()) {
+ channel.close();
+ }
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java
index 47e280e97ae..7de9e98d3f3 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java
@@ -159,6 +159,44 @@ public Map<String, Path> getFilesToStream(String id)
}
+ @Test
+ public void testChannelLeakOnTimeoutWithoutClose() throws Exception {
+ Files.createDirectories(sourceDir.resolve(SUBDIR));
+ Files.write(sourceDir.resolve(SUBDIR).resolve("file1"), CONTENT);
+
+ try (StreamingServer server = new StreamingServer(
+ new DirectoryServerSource(sourceDir) {
+ @Override
+ public Map<String, Path> getFilesToStream(String id)
+ throws InterruptedException {
+ // Delay to cause timeout
+ Thread.sleep(3000L);
+ return super.getFilesToStream(id);
+ }
+ }, 0)) {
+ server.start();
+
+ // Create client WITHOUT try-with-resources to simulate resource leak
+ StreamingClient client = new StreamingClient("localhost",
server.getPort(),
+ new DirectoryServerDestination(destDir));
+
+ try {
+ client.stream(SUBDIR, 1L, TimeUnit.SECONDS);
+ // Should not reach here
+ throw new AssertionError("Expected exception, but none was thrown");
+ } catch (StreamingException e) {
+ String message = e.getMessage();
+ if (!message.contains("timed out") && !message.contains("timeout")) {
+ throw new AssertionError(
+ "Expected timeout exception, but got: " + message + ". " +
+ "This indicates the bug: await() returned false but we didn't
check it. " +
+ "Channel may be leaking.");
+ }
+ }
+ client.close();
+ }
+ }
+
private void streamDir(String subdir) {
try (StreamingServer server = new StreamingServer(
new DirectoryServerSource(sourceDir), 0)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]