This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9bfb7ab8c42 Fix resource leak in KafkaIO GCS truststore file download
(#37681)
9bfb7ab8c42 is described below
commit 9bfb7ab8c42ede75309e117d19d2a729ea550855
Author: ZIHAN DAI <[email protected]>
AuthorDate: Tue Feb 24 02:48:14 2026 +1100
Fix resource leak in KafkaIO GCS truststore file download (#37681)
Convert manual resource close to try-with-resources in
identityOrGcsToLocalFile to prevent leaking ReadableByteChannel,
FileOutputStream, and WritableByteChannel when an IOException
occurs during the copy loop. Also preserve the original IOException
as the cause of the IllegalArgumentException.
---
.../io/kafka/KafkaReadSchemaTransformProvider.java | 31 +++++++++-------------
1 file changed, 12 insertions(+), 19 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index 74f9b147bbd..c5764b39bc6 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -402,31 +402,24 @@ public class KafkaReadSchemaTransformProvider
LOG.info(
"Downloading {} into local filesystem ({})", configStr,
localFile.toAbsolutePath());
// TODO(pabloem): Only copy if file does not exist.
- ReadableByteChannel channel =
-
FileSystems.open(FileSystems.match(configStr).metadata().get(0).resourceId());
- FileOutputStream outputStream = new
FileOutputStream(localFile.toFile());
-
- // Create a WritableByteChannel to write data to the
FileOutputStream
- WritableByteChannel outputChannel =
Channels.newChannel(outputStream);
-
- // Read data from the ReadableByteChannel and write it to the
WritableByteChannel
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- while (channel.read(buffer) != -1) {
- buffer.flip();
- outputChannel.write(buffer);
- buffer.compact();
+ try (ReadableByteChannel channel =
+
FileSystems.open(FileSystems.match(configStr).metadata().get(0).resourceId());
+ FileOutputStream outputStream = new
FileOutputStream(localFile.toFile());
+ WritableByteChannel outputChannel =
Channels.newChannel(outputStream)) {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ while (channel.read(buffer) != -1) {
+ buffer.flip();
+ outputChannel.write(buffer);
+ buffer.compact();
+ }
}
-
- // Close the channels and the output stream
- channel.close();
- outputChannel.close();
- outputStream.close();
return localFile.toAbsolutePath().toString();
} catch (IOException e) {
throw new IllegalArgumentException(
String.format(
"Unable to fetch file %s to be used locally to create a
Kafka Consumer.",
- configStr));
+ configStr),
+ e);
}
} else {
return configValue;