This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bcc4a00af9 [fix][client] Avoid closing reused ServiceUrlProvider in 
another PulsarClientImpl instance (#25947)
4bcc4a00af9 is described below

commit 4bcc4a00af9cfe3d5fdfbb46743fd1b731514862
Author: Oneby Wang <[email protected]>
AuthorDate: Mon Jun 8 07:05:56 2026 +0800

    [fix][client] Avoid closing reused ServiceUrlProvider in another 
PulsarClientImpl instance (#25947)
---
 .../pulsar/client/impl/PulsarClientImpl.java       |  4 +-
 .../pulsar/client/impl/PulsarClientImplTest.java   | 54 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 610cfb78c6c..23e6b94dcea 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -121,6 +121,7 @@ public class PulsarClientImpl implements PulsarClient {
     @Getter
     private final Timer timer;
     private boolean needStopTimer;
+    private boolean serviceUrlProviderInitialized;
     private final ExecutorProvider externalExecutorProvider;
     private final ExecutorProvider internalExecutorProvider;
     private final ExecutorProvider lookupExecutorProvider;
@@ -274,6 +275,7 @@ public class PulsarClientImpl implements PulsarClient {
 
             if (conf.getServiceUrlProvider() != null) {
                 conf.getServiceUrlProvider().initialize(this);
+                serviceUrlProviderInitialized = true;
             }
 
             if (conf.isEnableTransaction()) {
@@ -1064,7 +1066,7 @@ public class PulsarClientImpl implements PulsarClient {
             }
 
             // close the service url provider allocated resource.
-            if (conf != null && conf.getServiceUrlProvider() != null) {
+            if (conf != null && conf.getServiceUrlProvider() != null && 
serviceUrlProviderInitialized) {
                 conf.getServiceUrlProvider().close();
             }
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index b675f6e971e..b742af7336d 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -53,7 +53,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.regex.Pattern;
 import lombok.Cleanup;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
@@ -219,6 +221,31 @@ public class PulsarClientImplTest {
         }
     }
 
+    @Test
+    public void 
testFailedServiceUrlProviderInitializationDoesNotCloseProvider() throws 
Exception {
+        CloseCountingServiceUrlProvider provider = new 
CloseCountingServiceUrlProvider();
+
+        ClientConfigurationData firstConf = new ClientConfigurationData();
+        firstConf.setServiceUrl(provider.getServiceUrl());
+        firstConf.setServiceUrlProvider(provider);
+        initializeEventLoopGroup(firstConf);
+
+        PulsarClientImpl client = new PulsarClientImpl(firstConf, 
eventLoopGroup);
+        assertEquals(provider.getCloseCount(), 0);
+
+        ClientConfigurationData secondConf = new ClientConfigurationData();
+        secondConf.setServiceUrl(provider.getServiceUrl());
+        secondConf.setServiceUrlProvider(provider);
+
+        Throwable error = 
org.testng.Assert.expectThrows(IllegalStateException.class,
+                () -> new PulsarClientImpl(secondConf, eventLoopGroup));
+        assertEquals(error.getMessage(), "ServiceUrlProvider has already been 
initialized");
+        assertEquals(provider.getCloseCount(), 0);
+
+        client.close();
+        assertEquals(provider.getCloseCount(), 1);
+    }
+
     @Test
     public void testInitializingWithExecutorProviders() throws 
PulsarClientException {
         ClientConfigurationData conf = new ClientConfigurationData();
@@ -342,4 +369,31 @@ public class PulsarClientImplTest {
         assertTrue(ex instanceof 
PulsarClientException.InvalidTopicNameException);
         assertTrue(ex.getMessage().contains("V5 client SDK"));
     }
+
+    private static class CloseCountingServiceUrlProvider implements 
ServiceUrlProvider {
+        private PulsarClient client;
+        private int closeCount;
+
+        @Override
+        public synchronized void initialize(PulsarClient client) {
+            if (this.client != null) {
+                throw new IllegalStateException("ServiceUrlProvider has 
already been initialized");
+            }
+            this.client = client;
+        }
+
+        @Override
+        public String getServiceUrl() {
+            return "pulsar://localhost:6650";
+        }
+
+        @Override
+        public synchronized void close() {
+            closeCount++;
+        }
+
+        synchronized int getCloseCount() {
+            return closeCount;
+        }
+    }
 }

Reply via email to