This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 882946c59d3 [fix][client] Prevent duplicate ServiceUrlProvider
initialization (#25899)
882946c59d3 is described below
commit 882946c59d37b2a4d7e474bfa3a7d8da4d7fa59d
Author: Oneby Wang <[email protected]>
AuthorDate: Thu Jun 4 01:44:38 2026 +0800
[fix][client] Prevent duplicate ServiceUrlProvider initialization (#25899)
---
...SameAuthParamsLookupAutoClusterFailoverTest.java | 14 ++++++++++++++
.../pulsar/client/api/ServiceUrlProvider.java | 11 ++++++++++-
.../pulsar/client/impl/AutoClusterFailover.java | 15 +++++++++++++--
.../client/impl/ControlledClusterFailover.java | 15 +++++++++++++--
.../SameAuthParamsLookupAutoClusterFailover.java | 16 +++++++++++++---
.../pulsar/client/impl/AutoClusterFailoverTest.java | 21 +++++++++++++++++++++
.../client/impl/ControlledClusterFailoverTest.java | 17 +++++++++++++++++
7 files changed, 101 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
index 128be102f58..60324baf076 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
@@ -140,6 +140,20 @@ public class SameAuthParamsLookupAutoClusterFailoverTest
extends OneWayReplicato
dummyServer.close();
}
+ @Test
+ public void testInitializeCanOnlyBeCalledOnce() throws Exception {
+ setup();
+ final SameAuthParamsLookupAutoClusterFailover failover =
SameAuthParamsLookupAutoClusterFailover.builder()
+ .pulsarServiceUrlArray(new
String[]{pulsar1.getBrokerServiceUrl()})
+ .checkHealthyIntervalMs(1000)
+ .build();
+
+ try (PulsarClient client =
PulsarClient.builder().serviceUrlProvider(failover).build()) {
+ Throwable error = Assert.expectThrows(IllegalStateException.class,
() -> failover.initialize(client));
+ Assert.assertEquals(error.getMessage(), "ServiceUrlProvider has
already been initialized");
+ }
+ }
+
/**
* Wait for the state machine to converge to the expected per-index states
and current index.
* The state read happens on the failover executor to avoid races with the
periodic check task,
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
index e8b513b103f..f95f650cbb7 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
@@ -27,6 +27,11 @@ import
org.apache.pulsar.common.classification.InterfaceStability;
* <p>This allows applications to retrieve the service URL from an external
configuration provider and,
* more importantly, to force the Pulsar client to reconnect if the service
URL has been changed.
*
+ * <p>Each provider instance is tied to the lifecycle of one {@link
PulsarClient} instance. The client
+ * initializes the provider when the client is created and closes the provider
when the owning client is
+ * closed. Applications that create multiple Pulsar clients should create a
separate provider instance
+ * for each client instead of sharing one provider.
+ *
* <p>It can be passed with {@link
ClientBuilder#serviceUrlProvider(ServiceUrlProvider)}
*/
@InterfaceAudience.Public
@@ -39,6 +44,9 @@ public interface ServiceUrlProvider extends AutoCloseable {
* <p>This can be used by the provider to force the Pulsar client to
reconnect whenever the service url might have
* changed. See {@link PulsarClient#updateServiceUrl(String)}.
*
+ * <p>This method is invoked by the Pulsar client and is expected to be
called once for a provider
+ * instance. Implementations may reject repeated initialization.
+ *
* @param client
* created pulsar client.
*/
@@ -52,7 +60,8 @@ public interface ServiceUrlProvider extends AutoCloseable {
String getServiceUrl();
/**
- * Close the resource that the provider allocated.
+ * Close the resource that the provider allocated. The owning Pulsar
client invokes this method when
+ * it is closed.
*
*/
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
index 26d10dde3ad..b3d0e009821 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
@@ -40,6 +40,14 @@ import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
+/**
+ * A service URL provider that automatically fails over from the primary
Pulsar service URL to one of
+ * the secondary service URLs and switches back after the primary service
recovers.
+ *
+ * <p>Each instance is tied to the lifecycle of one {@link PulsarClient}. Once
initialized by a
+ * Pulsar client, it must not be reused by another client. Create a new
provider instance for each
+ * Pulsar client.
+ */
@CustomLog
@Data
public class AutoClusterFailover implements ServiceUrlProvider {
@@ -86,7 +94,10 @@ public class AutoClusterFailover implements
ServiceUrlProvider {
}
@Override
- public void initialize(PulsarClient client) {
+ public synchronized void initialize(PulsarClient client) {
+ if (this.pulsarClient != null) {
+ throw new IllegalStateException("ServiceUrlProvider has already
been initialized");
+ }
this.pulsarClient = (PulsarClientImpl) client;
this.addressResolver = pulsarClient.getAddressResolver();
ClientConfigurationData config = pulsarClient.getConfiguration();
@@ -123,7 +134,7 @@ public class AutoClusterFailover implements
ServiceUrlProvider {
}
@Override
- public void close() {
+ public synchronized void close() {
this.executor.shutdown();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
index 1252819a561..e4ad4ed1d1f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
@@ -57,6 +57,14 @@ import org.asynchttpclient.Response;
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
import org.jspecify.annotations.Nullable;
+/**
+ * A service URL provider that fetches controlled failover configuration from
an external HTTP service
+ * and updates the Pulsar client when the returned configuration changes.
+ *
+ * <p>Each instance is tied to the lifecycle of one {@link PulsarClient}. Once
initialized by a
+ * Pulsar client, it must not be reused by another client. Create a new
provider instance for each
+ * Pulsar client.
+ */
@CustomLog
public class ControlledClusterFailover implements ServiceUrlProvider {
private static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
@@ -111,7 +119,10 @@ public class ControlledClusterFailover implements
ServiceUrlProvider {
}
@Override
- public void initialize(PulsarClient client) {
+ public synchronized void initialize(PulsarClient client) {
+ if (this.pulsarClient != null) {
+ throw new IllegalStateException("ServiceUrlProvider has already
been initialized");
+ }
this.pulsarClient = (PulsarClientImpl) client;
this.httpClient = buildHttpClient();
this.requestBuilder = httpClient.prepareGet(urlProvider)
@@ -223,7 +234,7 @@ public class ControlledClusterFailover implements
ServiceUrlProvider {
}
@Override
- public void close() {
+ public synchronized void close() {
this.executor.shutdown();
if (httpClient != null) {
try {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
index 8abc0984c91..8d1bd777a7f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
@@ -37,6 +37,14 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
+/**
+ * A service URL provider that probes multiple Pulsar service URLs with the
same authentication
+ * parameters and fails over according to service health.
+ *
+ * <p>Each instance is tied to the lifecycle of one {@link PulsarClient}. Once
initialized by a
+ * Pulsar client, it must not be reused by another client. Create a new
provider instance for each
+ * Pulsar client.
+ */
@CustomLog
@SuppressFBWarnings(value = {"EI_EXPOSE_REP2"})
public class SameAuthParamsLookupAutoClusterFailover implements
ServiceUrlProvider {
@@ -65,7 +73,10 @@ public class SameAuthParamsLookupAutoClusterFailover
implements ServiceUrlProvid
private SameAuthParamsLookupAutoClusterFailover() {}
@Override
- public void initialize(PulsarClient client) {
+ public synchronized void initialize(PulsarClient client) {
+ if (this.pulsarClient != null) {
+ throw new IllegalStateException("ServiceUrlProvider has already
been initialized");
+ }
this.currentPulsarServiceIndex = 0;
this.pulsarClient = (PulsarClientImpl) client;
this.executor = EventLoopUtil.newEventLoopGroup(1, false,
@@ -110,7 +121,7 @@ public class SameAuthParamsLookupAutoClusterFailover
implements ServiceUrlProvid
@SuppressWarnings("deprecation")
@Override
- public void close() throws Exception {
+ public synchronized void close() throws Exception {
if (closed) {
return;
}
@@ -377,4 +388,3 @@ public class SameAuthParamsLookupAutoClusterFailover
implements ServiceUrlProvid
}
}
}
-
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
index 4596ac521b5..204225c03c8 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
@@ -31,10 +31,12 @@ import lombok.Cleanup;
import lombok.CustomLog;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
@@ -148,6 +150,25 @@ public class AutoClusterFailoverTest {
}
}
+ @Test
+ public void testInitializeCanOnlyBeCalledOnce() throws Exception {
+ String primary = "pulsar://localhost:6650";
+ String secondary = "pulsar://localhost:6651";
+
+ ServiceUrlProvider provider = AutoClusterFailover.builder()
+ .primary(primary)
+ .secondary(Collections.singletonList(secondary))
+ .failoverDelay(1, TimeUnit.SECONDS)
+ .switchBackDelay(1, TimeUnit.SECONDS)
+ .checkInterval(30, TimeUnit.SECONDS)
+ .build();
+
+ try (PulsarClient client =
PulsarClient.builder().serviceUrlProvider(provider).build()) {
+ Throwable error = Assert.expectThrows(IllegalStateException.class,
() -> provider.initialize(client));
+ assertEquals(error.getMessage(), "ServiceUrlProvider has already
been initialized");
+ }
+ }
+
@Test
public void testAutoClusterFailoverSwitchWithoutAuthentication() throws
Exception {
String primary = "pulsar://localhost:6650";
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
index 86b2fa7cb4f..cc47841ce0a 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.asynchttpclient.Request;
@@ -73,6 +74,22 @@ public class ControlledClusterFailoverTest {
Assert.assertEquals(request.getHeaders().get(keyB), valueB);
}
+ @Test
+ public void testInitializeCanOnlyBeCalledOnce() throws Exception {
+ String defaultServiceUrl = "pulsar://localhost:6650";
+ String urlProvider = "http://localhost:8080/test";
+
+ ServiceUrlProvider provider = ControlledClusterFailover.builder()
+ .defaultServiceUrl(defaultServiceUrl)
+ .urlProvider(urlProvider)
+ .build();
+
+ try (PulsarClient client =
PulsarClient.builder().serviceUrlProvider(provider).build()) {
+ Throwable error = Assert.expectThrows(IllegalStateException.class,
() -> provider.initialize(client));
+ Assert.assertEquals(error.getMessage(), "ServiceUrlProvider has
already been initialized");
+ }
+ }
+
@Test
public void testControlledClusterFailoverSwitch() throws Exception {
String defaultServiceUrl = "pulsar+ssl://localhost:6651";