This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bc3dc7727b1 [fix] [client] Fix resource leak in Pulsar Client since 
HttpLookupService doesn't get closed (#22858)
bc3dc7727b1 is described below

commit bc3dc7727b132dd88aa84f6befef42ea0646ec50
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jun 18 14:33:33 2024 +0800

    [fix] [client] Fix resource leak in Pulsar Client since HttpLookupService 
doesn't get closed (#22858)
---
 .../admin/PulsarClientImplMultiBrokersTest.java    | 79 ++++++++++++++++++++++
 .../pulsar/client/impl/PulsarClientImpl.java       | 22 ++++++
 2 files changed, 101 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PulsarClientImplMultiBrokersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PulsarClientImplMultiBrokersTest.java
new file mode 100644
index 00000000000..29604d0440b
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PulsarClientImplMultiBrokersTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.fail;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.MultiBrokerBaseTest;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.annotations.Test;
+
+/**
+ * Test multi-broker admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class PulsarClientImplMultiBrokersTest extends MultiBrokerBaseTest {
+    @Override
+    protected int numberOfAdditionalBrokers() {
+        return 3;
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        this.conf.setManagedLedgerMaxEntriesPerLedger(10);
+    }
+
+    @Override
+    protected void onCleanup() {
+        super.onCleanup();
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testReleaseUrlLookupServices() throws Exception {
+        PulsarClientImpl pulsarClient = (PulsarClientImpl) 
additionalBrokerClients.get(0);
+        Map<String, LookupService> urlLookupMap = 
WhiteboxImpl.getInternalState(pulsarClient, "urlLookupMap");
+        assertEquals(urlLookupMap.size(), 0);
+        for (PulsarService pulsar : additionalBrokers) {
+            pulsarClient.getLookup(pulsar.getBrokerServiceUrl());
+            pulsarClient.getLookup(pulsar.getWebServiceAddress());
+        }
+        assertEquals(urlLookupMap.size(), additionalBrokers.size() * 2);
+        // Verify: lookup services will be release.
+        pulsarClient.close();
+        assertEquals(urlLookupMap.size(), 0);
+        try {
+            for (PulsarService pulsar : additionalBrokers) {
+                pulsarClient.getLookup(pulsar.getBrokerServiceUrl());
+                pulsarClient.getLookup(pulsar.getWebServiceAddress());
+            }
+            fail("Expected a error when calling pulsarClient.getLookup if 
getLookup was closed");
+        } catch (IllegalStateException illegalArgumentException) {
+            assertTrue(illegalArgumentException.getMessage().contains("has 
been closed"));
+        }
+        assertEquals(urlLookupMap.size(), 0);
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index e8107efe98e..f4afb2931cc 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -33,6 +33,7 @@ import java.time.Clock;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -744,6 +745,21 @@ public class PulsarClientImpl implements PulsarClient {
         }
     }
 
+    private void closeUrlLookupMap() {
+        Map<String, LookupService> closedUrlLookupServices = new 
HashMap(urlLookupMap.size());
+        urlLookupMap.entrySet().forEach(e -> {
+            try {
+                e.getValue().close();
+            } catch (Exception ex) {
+                log.error("Error closing lookup service {}", e.getKey(), ex);
+            }
+            closedUrlLookupServices.put(e.getKey(), e.getValue());
+        });
+        closedUrlLookupServices.entrySet().forEach(e -> {
+            urlLookupMap.remove(e.getKey(), e.getValue());
+        });
+    }
+
     @Override
     public CompletableFuture<Void> closeAsync() {
         log.info("Client closing. URL: {}", lookup.getServiceUrl());
@@ -754,6 +770,8 @@ public class PulsarClientImpl implements PulsarClient {
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
+        closeUrlLookupMap();
+
         producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> {
             if (t != null) {
                 log.error("Error closing producer {}", p, t);
@@ -982,6 +1000,10 @@ public class PulsarClientImpl implements PulsarClient {
 
     public LookupService getLookup(String serviceUrl) {
         return urlLookupMap.computeIfAbsent(serviceUrl, url -> {
+            if (isClosed()) {
+                throw new IllegalStateException("Pulsar client has been 
closed, can not build LookupService when"
+                        + " calling get lookup with an url");
+            }
             try {
                 return createLookup(serviceUrl);
             } catch (PulsarClientException e) {

Reply via email to