This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4bcc4a00af9 [fix][client] Avoid closing reused ServiceUrlProvider in
another PulsarClientImpl instance (#25947)
4bcc4a00af9 is described below
commit 4bcc4a00af9cfe3d5fdfbb46743fd1b731514862
Author: Oneby Wang <[email protected]>
AuthorDate: Mon Jun 8 07:05:56 2026 +0800
[fix][client] Avoid closing reused ServiceUrlProvider in another
PulsarClientImpl instance (#25947)
---
.../pulsar/client/impl/PulsarClientImpl.java | 4 +-
.../pulsar/client/impl/PulsarClientImplTest.java | 54 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 1 deletion(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 610cfb78c6c..23e6b94dcea 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -121,6 +121,7 @@ public class PulsarClientImpl implements PulsarClient {
@Getter
private final Timer timer;
private boolean needStopTimer;
+ private boolean serviceUrlProviderInitialized;
private final ExecutorProvider externalExecutorProvider;
private final ExecutorProvider internalExecutorProvider;
private final ExecutorProvider lookupExecutorProvider;
@@ -274,6 +275,7 @@ public class PulsarClientImpl implements PulsarClient {
if (conf.getServiceUrlProvider() != null) {
conf.getServiceUrlProvider().initialize(this);
+ serviceUrlProviderInitialized = true;
}
if (conf.isEnableTransaction()) {
@@ -1064,7 +1066,7 @@ public class PulsarClientImpl implements PulsarClient {
}
// close the service url provider allocated resource.
- if (conf != null && conf.getServiceUrlProvider() != null) {
+ if (conf != null && conf.getServiceUrlProvider() != null &&
serviceUrlProviderInitialized) {
conf.getServiceUrlProvider().close();
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index b675f6e971e..b742af7336d 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -53,7 +53,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.regex.Pattern;
import lombok.Cleanup;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
@@ -219,6 +221,31 @@ public class PulsarClientImplTest {
}
}
+ @Test
+ public void
testFailedServiceUrlProviderInitializationDoesNotCloseProvider() throws
Exception {
+ CloseCountingServiceUrlProvider provider = new
CloseCountingServiceUrlProvider();
+
+ ClientConfigurationData firstConf = new ClientConfigurationData();
+ firstConf.setServiceUrl(provider.getServiceUrl());
+ firstConf.setServiceUrlProvider(provider);
+ initializeEventLoopGroup(firstConf);
+
+ PulsarClientImpl client = new PulsarClientImpl(firstConf,
eventLoopGroup);
+ assertEquals(provider.getCloseCount(), 0);
+
+ ClientConfigurationData secondConf = new ClientConfigurationData();
+ secondConf.setServiceUrl(provider.getServiceUrl());
+ secondConf.setServiceUrlProvider(provider);
+
+ Throwable error =
org.testng.Assert.expectThrows(IllegalStateException.class,
+ () -> new PulsarClientImpl(secondConf, eventLoopGroup));
+ assertEquals(error.getMessage(), "ServiceUrlProvider has already been
initialized");
+ assertEquals(provider.getCloseCount(), 0);
+
+ client.close();
+ assertEquals(provider.getCloseCount(), 1);
+ }
+
@Test
public void testInitializingWithExecutorProviders() throws
PulsarClientException {
ClientConfigurationData conf = new ClientConfigurationData();
@@ -342,4 +369,31 @@ public class PulsarClientImplTest {
assertTrue(ex instanceof
PulsarClientException.InvalidTopicNameException);
assertTrue(ex.getMessage().contains("V5 client SDK"));
}
+
+ private static class CloseCountingServiceUrlProvider implements
ServiceUrlProvider {
+ private PulsarClient client;
+ private int closeCount;
+
+ @Override
+ public synchronized void initialize(PulsarClient client) {
+ if (this.client != null) {
+ throw new IllegalStateException("ServiceUrlProvider has
already been initialized");
+ }
+ this.client = client;
+ }
+
+ @Override
+ public String getServiceUrl() {
+ return "pulsar://localhost:6650";
+ }
+
+ @Override
+ public synchronized void close() {
+ closeCount++;
+ }
+
+ synchronized int getCloseCount() {
+ return closeCount;
+ }
+ }
}