This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 85bdf67617c [fix][client] Don't pin broker-assigned redirect URL 
across reconnect retries (#26009)
85bdf67617c is described below

commit 85bdf67617cc922b51cf59391f9caca697ca472a
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Jun 12 08:23:38 2026 -0700

    [fix][client] Don't pin broker-assigned redirect URL across reconnect 
retries (#26009)
---
 .../client/impl/BrokerClientIntegrationTest.java   | 42 ++++++++++++++++++++++
 .../pulsar/client/impl/ConnectionHandler.java      | 12 +++++--
 2 files changed, 51 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index ddc955f84d8..959bbe6a80f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -41,6 +41,8 @@ import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import java.io.InputStream;
 import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.net.URI;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.security.GeneralSecurityException;
@@ -48,6 +50,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.NavigableMap;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -1128,4 +1131,43 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
         consumer1.unsubscribe(true);
         assertFalse(consumer1.isConnected());
     }
+
+    /**
+     * Regression test for https://github.com/apache/pulsar/issues/25997: a 
broker-assigned redirect
+     * ({@code CloseProducer}/{@code CloseConsumer} carrying an {@code 
assignedBrokerServiceUrl})
+     * pointing at a wrong or unreachable broker must be honored for the 
immediate reconnect attempt
+     * only. If that attempt fails, subsequent retries must fall back to topic 
lookup instead of
+     * staying pinned to the stale address.
+     */
+    @Test
+    public void testStaleBrokerRedirectFallsBackToLookup() throws Exception {
+        String topic = "persistent://my-property/my-ns/staleRedirectFallback";
+        @Cleanup
+        Producer<byte[]> producerInstance = 
pulsarClient.newProducer().topic(topic).create();
+        @Cleanup
+        Consumer<byte[]> consumerInstance = 
pulsarClient.newConsumer().topic(topic)
+                .subscriptionName("my-subscriber-name").subscribe();
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
producerInstance;
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
consumerInstance;
+
+        // An address with no listener behind it: dialing the redirect fails 
immediately.
+        URI unreachable;
+        try (ServerSocket socket = new ServerSocket(0)) {
+            unreachable = URI.create("pulsar://127.0.0.1:" + 
socket.getLocalPort());
+        }
+
+        // Deliver the disconnect+redirect exactly as 
ClientCnx#handleCloseProducer and
+        // #handleCloseConsumer do for a broker unload notification.
+        producer.connectionClosed(producer.getClientCnx(), Optional.of(0L), 
Optional.of(unreachable));
+        consumer.connectionClosed(consumer.getClientCnx(), Optional.of(0L), 
Optional.of(unreachable));
+
+        // The redirect dial fails; the clients must recover by looking up the 
topic again.
+        Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(producer.getState(), State.Ready);
+            assertEquals(consumer.getState(), State.Ready);
+        });
+
+        producer.send("msg".getBytes(UTF_8));
+        assertNotNull(consumer.receive(10, TimeUnit.SECONDS));
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index e398b0583fa..5538feb8537 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -227,9 +227,15 @@ public class ConnectionHandler {
     public void connectionClosed(ClientCnx cnx, Optional<Long> 
initialConnectionDelayMs, Optional<URI> hostUrl) {
         lastConnectionClosedTimestamp = System.currentTimeMillis();
         duringConnect.set(false);
-        // Remember an explicit reconnect target so a later first-attempt 
failure (reconnectLater)
-        // re-dials the same broker rather than falling back to the service 
URL.
-        hostUrl.ifPresent(uri -> this.explicitHostURI = uri);
+        // Only handlers already pinned to an explicit host (v5 TC 
metadata-store discovery) update
+        // the pin here, so a later first-attempt failure (reconnectLater) 
re-dials the current
+        // leader rather than falling back to the service URL. For 
lookup-based handlers
+        // (producers/consumers), the broker-assigned redirect in hostUrl may 
be wrong or stale:
+        // it is honored for the immediate reconnect attempt below only, and 
retries after a
+        // failure go through a fresh topic lookup instead of staying pinned 
to it.
+        if (explicitHostURI != null) {
+            hostUrl.ifPresent(uri -> this.explicitHostURI = uri);
+        }
         state.client.getCnxPool().releaseConnection(cnx);
         if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
             if (!state.changeToConnecting()) {

Reply via email to