This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f9c1fe022b0921d774cf294aa5d4463674a1c9ba Author: Xu Chengxin <[email protected]> AuthorDate: Wed Jun 11 00:50:12 2025 +0800 [fix][client] Fix some potential resource leak (#24402) Fix resource leak using try-catch stmt method in #24389, and close other leaks in this project I've found later (cherry picked from commit bfee7a68f84611b679a2d0e8942045462328f10a) --- .../org/apache/pulsar/client/impl/AutoClusterFailover.java | 6 +++--- .../org/apache/pulsar/functions/runtime/RuntimeUtils.java | 14 +++++++------- .../apache/pulsar/functions/worker/FunctionActioner.java | 12 +++++------- .../pulsar/functions/worker/rest/api/ComponentImpl.java | 5 +++-- .../apache/pulsar/testclient/LoadSimulationController.java | 13 ++++++------- 5 files changed, 24 insertions(+), 26 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java index 68b781e67d2..844d1e2d253 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java @@ -128,9 +128,9 @@ public class AutoClusterFailover implements ServiceUrlProvider { try { resolver.updateServiceUrl(url); InetSocketAddress endpoint = resolver.resolveHost(); - Socket socket = new Socket(); - socket.connect(new InetSocketAddress(endpoint.getHostName(), endpoint.getPort()), TIMEOUT); - socket.close(); + try (Socket socket = new Socket()) { + socket.connect(new InetSocketAddress(endpoint.getHostName(), endpoint.getPort()), TIMEOUT); + } return true; } catch (Exception e) { log.warn("Failed to probe available, url: {}", url, e); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 49a5dd40fa2..ae58fc40e2b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -540,17 +540,17 @@ public class RuntimeUtils { } public static String getPrometheusMetrics(int metricsPort) throws IOException { - StringBuilder result = new StringBuilder(); URL url = new URL(String.format("http://%s:%s", InetAddress.getLocalHost().getHostAddress(), metricsPort)); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod("GET"); - BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream())); - String line; - while ((line = rd.readLine()) != null) { - result.append(line + System.lineSeparator()); + try (BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()))) { + StringBuilder result = new StringBuilder(); + String line; + while ((line = rd.readLine()) != null) { + result.append(line + System.lineSeparator()); + } + return result.toString(); } - rd.close(); - return result.toString(); } /** diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 389051fce42..4bdd120e215 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -266,13 +266,11 @@ public class FunctionActioner { } else if (downloadFromPackageManagementService) { getPulsarAdmin().packages().download(pkgLocationPath, tempPkgFile.getPath()); } else { - FileOutputStream tempPkgFos = new FileOutputStream(tempPkgFile); - WorkerUtils.downloadFromBookkeeper( - dlogNamespace, - tempPkgFos, - pkgLocationPath); - if (tempPkgFos != null) { - tempPkgFos.close(); + try (FileOutputStream tempPkgFos = new FileOutputStream(tempPkgFile)) { + WorkerUtils.downloadFromBookkeeper( + dlogNamespace, + tempPkgFos, + pkgLocationPath); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index ba87713d3c1..4058ebb667c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -1283,8 +1283,9 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> { if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) { File tempFile = createPkgTempFile(); tempFile.deleteOnExit(); - FileOutputStream out = new FileOutputStream(tempFile); - IOUtils.copy(uploadedInputStream, out); + try (FileOutputStream out = new FileOutputStream(tempFile)) { + IOUtils.copy(uploadedInputStream, out); + } PackageMetadata metadata = PackageMetadata.builder().createTime(System.currentTimeMillis()).build(); worker().getBrokerAdmin().packages().upload(metadata, path, tempFile.getAbsolutePath()); } else { diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java index 2ba3eac171b..26f8cae012f 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java @@ -638,14 +638,13 @@ public class LoadSimulationController extends CmdBase{ final List<String> commandArguments = arguments.commandArguments; checkAppArgs(commandArguments.size() - 1, 1); final String scriptName = commandArguments.get(1); - final BufferedReader scriptReader = new BufferedReader( - new InputStreamReader(new FileInputStream(Paths.get(scriptName).toFile()))); - String line = scriptReader.readLine(); - while (line != null) { - read(line.split("\\s+")); - line = scriptReader.readLine(); + try (BufferedReader scriptReader = new BufferedReader( + new InputStreamReader(new FileInputStream(Paths.get(scriptName).toFile())))) { + String line; + while ((line = scriptReader.readLine()) != null) { + read(line.split("\\s+")); + } } - scriptReader.close(); break; case "copy": handleCopy(arguments);
