This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fb83c7d46bfb59bf069b31902dc85c969588a506 Author: Oneby Wang <[email protected]> AuthorDate: Thu Jun 4 01:44:38 2026 +0800 [fix][client] Prevent duplicate ServiceUrlProvider initialization (#25899) (cherry picked from commit 882946c59d37b2a4d7e474bfa3a7d8da4d7fa59d) --- ...SameAuthParamsLookupAutoClusterFailoverTest.java | 14 ++++++++++++++ .../pulsar/client/api/ServiceUrlProvider.java | 11 ++++++++++- .../pulsar/client/impl/AutoClusterFailover.java | 15 +++++++++++++-- .../client/impl/ControlledClusterFailover.java | 15 +++++++++++++-- .../SameAuthParamsLookupAutoClusterFailover.java | 16 +++++++++++++--- .../pulsar/client/impl/AutoClusterFailoverTest.java | 21 +++++++++++++++++++++ .../client/impl/ControlledClusterFailoverTest.java | 17 +++++++++++++++++ 7 files changed, 101 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java index 1314be12c9d..a2e587fef39 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java @@ -172,6 +172,20 @@ public class SameAuthParamsLookupAutoClusterFailoverTest extends OneWayReplicato dummyServer.close(); } + @Test + public void testInitializeCanOnlyBeCalledOnce() throws Exception { + setup(); + final SameAuthParamsLookupAutoClusterFailover failover = SameAuthParamsLookupAutoClusterFailover.builder() + .pulsarServiceUrlArray(new String[]{pulsar1.getBrokerServiceUrl()}) + .checkHealthyIntervalMs(1000) + .build(); + + try (PulsarClient client = PulsarClient.builder().serviceUrlProvider(failover).build()) { + Throwable error = Assert.expectThrows(IllegalStateException.class, () -> failover.initialize(client)); + Assert.assertEquals(error.getMessage(), "ServiceUrlProvider has already been initialized"); + } + } + @Override protected void cleanupPulsarResources() { // Nothing to do. diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java index e8b513b103f..f95f650cbb7 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java @@ -27,6 +27,11 @@ import org.apache.pulsar.common.classification.InterfaceStability; * <p>This allows applications to retrieve the service URL from an external configuration provider and, * more importantly, to force the Pulsar client to reconnect if the service URL has been changed. * + * <p>Each provider instance is tied to the lifecycle of one {@link PulsarClient} instance. The client + * initializes the provider when the client is created and closes the provider when the owning client is + * closed. Applications that create multiple Pulsar clients should create a separate provider instance + * for each client instead of sharing one provider. + * * <p>It can be passed with {@link ClientBuilder#serviceUrlProvider(ServiceUrlProvider)} */ @InterfaceAudience.Public @@ -39,6 +44,9 @@ public interface ServiceUrlProvider extends AutoCloseable { * <p>This can be used by the provider to force the Pulsar client to reconnect whenever the service url might have * changed. See {@link PulsarClient#updateServiceUrl(String)}. * + * <p>This method is invoked by the Pulsar client and is expected to be called once for a provider + * instance. Implementations may reject repeated initialization. + * * @param client * created pulsar client. */ @@ -52,7 +60,8 @@ public interface ServiceUrlProvider extends AutoCloseable { String getServiceUrl(); /** - * Close the resource that the provider allocated. + * Close the resource that the provider allocated. The owning Pulsar client invokes this method when + * it is closed. * */ @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java index 844d1e2d253..5657d5067c0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java @@ -39,6 +39,14 @@ import org.apache.pulsar.client.api.ServiceUrlProvider; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; +/** + * A service URL provider that automatically fails over from the primary Pulsar service URL to one of + * the secondary service URLs and switches back after the primary service recovers. + * + * <p>Each instance is tied to the lifecycle of one {@link PulsarClient}. Once initialized by a + * Pulsar client, it must not be reused by another client. Create a new provider instance for each + * Pulsar client. + */ @Slf4j @Data public class AutoClusterFailover implements ServiceUrlProvider { @@ -84,7 +92,10 @@ public class AutoClusterFailover implements ServiceUrlProvider { } @Override - public void initialize(PulsarClient client) { + public synchronized void initialize(PulsarClient client) { + if (this.pulsarClient != null) { + throw new IllegalStateException("ServiceUrlProvider has already been initialized"); + } this.pulsarClient = (PulsarClientImpl) client; ClientConfigurationData config = pulsarClient.getConfiguration(); if (config != null) { @@ -120,7 +131,7 @@ public class AutoClusterFailover implements ServiceUrlProvider { } @Override - public void close() { + public synchronized void close() { this.executor.shutdown(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java index 1d4740b8472..db3d7939d3b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java @@ -55,6 +55,14 @@ import org.asynchttpclient.Response; import org.asynchttpclient.channel.DefaultKeepAliveStrategy; import org.jspecify.annotations.Nullable; +/** + * A service URL provider that fetches controlled failover configuration from an external HTTP service + * and updates the Pulsar client when the returned configuration changes. + * + * <p>Each instance is tied to the lifecycle of one {@link PulsarClient}. Once initialized by a + * Pulsar client, it must not be reused by another client. Create a new provider instance for each + * Pulsar client. + */ @Slf4j public class ControlledClusterFailover implements ServiceUrlProvider { private static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10; @@ -107,7 +115,10 @@ public class ControlledClusterFailover implements ServiceUrlProvider { } @Override - public void initialize(PulsarClient client) { + public synchronized void initialize(PulsarClient client) { + if (this.pulsarClient != null) { + throw new IllegalStateException("ServiceUrlProvider has already been initialized"); + } this.pulsarClient = (PulsarClientImpl) client; // Initialize currentControlledConfiguration from client's current configuration @@ -210,7 +221,7 @@ public class ControlledClusterFailover implements ServiceUrlProvider { } @Override - public void close() { + public synchronized void close() { this.executor.shutdown(); if (httpClient != null) { try { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java index 448dd95c4fd..69c74bd28d9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java @@ -37,6 +37,14 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; +/** + * A service URL provider that probes multiple Pulsar service URLs with the same authentication + * parameters and fails over according to service health. + * + * <p>Each instance is tied to the lifecycle of one {@link PulsarClient}. Once initialized by a + * Pulsar client, it must not be reused by another client. Create a new provider instance for each + * Pulsar client. + */ @Slf4j @SuppressFBWarnings(value = {"EI_EXPOSE_REP2"}) public class SameAuthParamsLookupAutoClusterFailover implements ServiceUrlProvider { @@ -65,7 +73,10 @@ public class SameAuthParamsLookupAutoClusterFailover implements ServiceUrlProvid private SameAuthParamsLookupAutoClusterFailover() {} @Override - public void initialize(PulsarClient client) { + public synchronized void initialize(PulsarClient client) { + if (this.pulsarClient != null) { + throw new IllegalStateException("ServiceUrlProvider has already been initialized"); + } this.currentPulsarServiceIndex = 0; this.pulsarClient = (PulsarClientImpl) client; this.executor = EventLoopUtil.newEventLoopGroup(1, false, @@ -111,7 +122,7 @@ public class SameAuthParamsLookupAutoClusterFailover implements ServiceUrlProvid } @Override - public void close() throws Exception { + public synchronized void close() throws Exception { if (closed) { return; } @@ -371,4 +382,3 @@ public class SameAuthParamsLookupAutoClusterFailover implements ServiceUrlProvid } } } - diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java index b275ffb6012..00688db800f 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java @@ -31,10 +31,12 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.ServiceUrlProvider; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.awaitility.Awaitility; import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.Test; @Test(groups = "broker-impl") @@ -148,6 +150,25 @@ public class AutoClusterFailoverTest { } } + @Test + public void testInitializeCanOnlyBeCalledOnce() throws Exception { + String primary = "pulsar://localhost:6650"; + String secondary = "pulsar://localhost:6651"; + + ServiceUrlProvider provider = AutoClusterFailover.builder() + .primary(primary) + .secondary(Collections.singletonList(secondary)) + .failoverDelay(1, TimeUnit.SECONDS) + .switchBackDelay(1, TimeUnit.SECONDS) + .checkInterval(30, TimeUnit.SECONDS) + .build(); + + try (PulsarClient client = PulsarClient.builder().serviceUrlProvider(provider).build()) { + Throwable error = Assert.expectThrows(IllegalStateException.class, () -> provider.initialize(client)); + assertEquals(error.getMessage(), "ServiceUrlProvider has already been initialized"); + } + } + @Test public void testAutoClusterFailoverSwitchWithoutAuthentication() throws Exception { String primary = "pulsar://localhost:6650"; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java index 86b2fa7cb4f..cc47841ce0a 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.ServiceUrlProvider; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.asynchttpclient.Request; @@ -73,6 +74,22 @@ public class ControlledClusterFailoverTest { Assert.assertEquals(request.getHeaders().get(keyB), valueB); } + @Test + public void testInitializeCanOnlyBeCalledOnce() throws Exception { + String defaultServiceUrl = "pulsar://localhost:6650"; + String urlProvider = "http://localhost:8080/test"; + + ServiceUrlProvider provider = ControlledClusterFailover.builder() + .defaultServiceUrl(defaultServiceUrl) + .urlProvider(urlProvider) + .build(); + + try (PulsarClient client = PulsarClient.builder().serviceUrlProvider(provider).build()) { + Throwable error = Assert.expectThrows(IllegalStateException.class, () -> provider.initialize(client)); + Assert.assertEquals(error.getMessage(), "ServiceUrlProvider has already been initialized"); + } + } + @Test public void testControlledClusterFailoverSwitch() throws Exception { String defaultServiceUrl = "pulsar+ssl://localhost:6651";
