This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new daa3066265c [fix][client] Make the whole grabCnx() progress atomic 
(#20595)
daa3066265c is described below

commit daa3066265c8cc02efdd07dedffbc3878541bd79
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Jun 30 16:54:17 2023 +0800

    [fix][client] Make the whole grabCnx() progress atomic (#20595)
---
 .../pulsar/client/impl/ConnectionHandlerTest.java  | 154 +++++++++++++++++++++
 .../pulsar/client/impl/ConnectionHandler.java      |  50 ++++---
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  26 ++--
 .../apache/pulsar/client/impl/ProducerImpl.java    |  29 ++--
 .../pulsar/client/impl/TopicListWatcher.java       |  27 ++--
 .../client/impl/TransactionMetaStoreHandler.java   |  13 +-
 6 files changed, 246 insertions(+), 53 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
new file mode 100644
index 00000000000..f29d62db5f4
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class ConnectionHandlerTest extends ProducerConsumerBase {
+
+    private static final Backoff BACKOFF = new 
BackoffBuilder().setInitialTime(1, TimeUnit.MILLISECONDS)
+            .setMandatoryStop(1, TimeUnit.SECONDS)
+            .setMax(3, TimeUnit.SECONDS).create();
+    private final ExecutorService executor = Executors.newFixedThreadPool(4);
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        executor.shutdown();
+    }
+
+    @Test(timeOut = 30000)
+    public void testSynchronousGrabCnx() {
+        for (int i = 0; i < 10; i++) {
+            final CompletableFuture<Integer> future = new 
CompletableFuture<>();
+            final int index = i;
+            final ConnectionHandler handler = new ConnectionHandler(
+                    new MockedHandlerState((PulsarClientImpl) pulsarClient, 
"my-topic"), BACKOFF,
+                    cnx -> {
+                        future.complete(index);
+                        return CompletableFuture.completedFuture(null);
+                    });
+            handler.grabCnx();
+            Assert.assertEquals(future.join(), i);
+        }
+    }
+
+    @Test
+    public void testConcurrentGrabCnx() {
+        final AtomicInteger cnt = new AtomicInteger(0);
+        final ConnectionHandler handler = new ConnectionHandler(
+                new MockedHandlerState((PulsarClientImpl) pulsarClient, 
"my-topic"), BACKOFF,
+                cnx -> {
+                    cnt.incrementAndGet();
+                    return CompletableFuture.completedFuture(null);
+                });
+        final int numGrab = 10;
+        for (int i = 0; i < numGrab; i++) {
+            handler.grabCnx();
+        }
+        Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> cnt.get() 
> 0);
+        Assert.assertThrows(ConditionTimeoutException.class,
+                () -> 
Awaitility.await().atMost(Duration.ofMillis(500)).until(() -> cnt.get() == 
numGrab));
+        Assert.assertEquals(cnt.get(), 1);
+    }
+
+    @Test
+    public void testDuringConnectInvokeCount() throws IllegalAccessException {
+        // 1. connectionOpened completes with null
+        final AtomicBoolean duringConnect = spy(new AtomicBoolean());
+        final ConnectionHandler handler1 = new ConnectionHandler(
+                new MockedHandlerState((PulsarClientImpl) pulsarClient, 
"my-topic"), BACKOFF,
+                cnx -> CompletableFuture.completedFuture(null));
+        FieldUtils.writeField(handler1, "duringConnect", duringConnect, true);
+        handler1.grabCnx();
+        Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> 
!duringConnect.get());
+        verify(duringConnect, times(1)).compareAndSet(false, true);
+        verify(duringConnect, times(1)).set(false);
+
+        // 2. connectionFailed is called
+        final ConnectionHandler handler2 = new ConnectionHandler(
+                new MockedHandlerState((PulsarClientImpl) pulsarClient, null), 
new MockedBackoff(),
+                cnx -> CompletableFuture.completedFuture(null));
+        FieldUtils.writeField(handler2, "duringConnect", duringConnect, true);
+        handler2.grabCnx();
+        Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> 
!duringConnect.get());
+        verify(duringConnect, times(2)).compareAndSet(false, true);
+        verify(duringConnect, times(2)).set(false);
+
+        // 3. connectionOpened completes exceptionally
+        final ConnectionHandler handler3 = new ConnectionHandler(
+                new MockedHandlerState((PulsarClientImpl) pulsarClient, 
"my-topic"), new MockedBackoff(),
+                cnx -> FutureUtil.failedFuture(new RuntimeException("fail")));
+        FieldUtils.writeField(handler3, "duringConnect", duringConnect, true);
+        handler3.grabCnx();
+        Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> 
!duringConnect.get());
+        verify(duringConnect, times(3)).compareAndSet(false, true);
+        verify(duringConnect, times(3)).set(false);
+    }
+
+    private static class MockedHandlerState extends HandlerState {
+
+        public MockedHandlerState(PulsarClientImpl client, String topic) {
+            super(client, topic);
+        }
+
+        @Override
+        String getHandlerName() {
+            return "mocked";
+        }
+    }
+
+    private static class MockedBackoff extends Backoff {
+
+        // Set a large backoff so that reconnection won't happen in tests
+        public MockedBackoff() {
+            super(1, TimeUnit.HOURS, 2, TimeUnit.HOURS, 1, TimeUnit.HOURS);
+        }
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 6c5a5be200f..ea1e09670e9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -40,10 +41,16 @@ public class ConnectionHandler {
     // Start with -1L because it gets incremented before sending on the first 
connection
     private volatile long epoch = -1L;
     protected volatile long lastConnectionClosedTimestamp = 0L;
+    private final AtomicBoolean duringConnect = new AtomicBoolean(false);
 
     interface Connection {
-        void connectionFailed(PulsarClientException exception);
-        void connectionOpened(ClientCnx cnx);
+
+        /**
+         * @apiNote If the returned future is completed exceptionally, 
reconnectLater will be called.
+         */
+        CompletableFuture<Void> connectionOpened(ClientCnx cnx);
+        default void connectionFailed(PulsarClientException e) {
+        }
     }
 
     protected Connection connection;
@@ -68,6 +75,11 @@ public class ConnectionHandler {
                     state.topic, state.getHandlerName(), state.getState());
             return;
         }
+        if (!duringConnect.compareAndSet(false, true)) {
+            log.info("[{}] [{}] Skip grabbing the connection since there is a 
pending connection",
+                    state.topic, state.getHandlerName());
+            return;
+        }
 
         try {
             CompletableFuture<ClientCnx> cnxFuture;
@@ -76,7 +88,8 @@ public class ConnectionHandler {
             } else {
                 cnxFuture = state.client.getConnection(state.topic); //
             }
-            cnxFuture.thenAccept(cnx -> connection.connectionOpened(cnx)) //
+            cnxFuture.thenCompose(cnx -> connection.connectionOpened(cnx))
+                    .thenAccept(__ -> duringConnect.set(false))
                     .exceptionally(this::handleConnectionError);
         } catch (Throwable t) {
             log.warn("[{}] [{}] Exception thrown while getting connection: ", 
state.topic, state.getHandlerName(), t);
@@ -85,25 +98,27 @@ public class ConnectionHandler {
     }
 
     private Void handleConnectionError(Throwable exception) {
-        log.warn("[{}] [{}] Error connecting to broker: {}",
-                state.topic, state.getHandlerName(), exception.getMessage());
-        if (exception instanceof PulsarClientException) {
-            connection.connectionFailed((PulsarClientException) exception);
-        } else if (exception.getCause() instanceof  PulsarClientException) {
-            connection.connectionFailed((PulsarClientException) 
exception.getCause());
-        } else {
-            connection.connectionFailed(new PulsarClientException(exception));
-        }
-
-        State state = this.state.getState();
-        if (state == State.Uninitialized || state == State.Connecting || state 
== State.Ready) {
-            reconnectLater(exception);
+        try {
+            log.warn("[{}] [{}] Error connecting to broker: {}",
+                    state.topic, state.getHandlerName(), 
exception.getMessage());
+            if (exception instanceof PulsarClientException) {
+                connection.connectionFailed((PulsarClientException) exception);
+            } else if (exception.getCause() instanceof PulsarClientException) {
+                connection.connectionFailed((PulsarClientException) 
exception.getCause());
+            } else {
+                connection.connectionFailed(new 
PulsarClientException(exception));
+            }
+        } catch (Throwable throwable) {
+            log.error("[{}] [{}] Unexpected exception after the connection",
+                    state.topic, state.getHandlerName(), throwable);
         }
 
+        reconnectLater(exception);
         return null;
     }
 
-    protected void reconnectLater(Throwable exception) {
+    void reconnectLater(Throwable exception) {
+        duringConnect.set(false);
         CLIENT_CNX_UPDATER.set(this, null);
         if (!isValidStateForReconnection()) {
             log.info("[{}] [{}] Ignoring reconnection request (state: {})",
@@ -127,6 +142,7 @@ public class ConnectionHandler {
 
     public void connectionClosed(ClientCnx cnx) {
         lastConnectionClosedTimestamp = System.currentTimeMillis();
+        duringConnect.set(false);
         state.client.getCnxPool().releaseConnection(cnx);
         if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
             if (!isValidStateForReconnection()) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 101d42a6694..f845407435e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -801,16 +801,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     @Override
-    public void connectionOpened(final ClientCnx cnx) {
+    public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
         previousExceptions.clear();
 
-        if (getState() == State.Closing || getState() == State.Closed) {
+        final State state = getState();
+        if (state == State.Closing || state == State.Closed) {
             setState(State.Closed);
             closeConsumerTasks();
             deregisterFromClientCnx();
             client.cleanupConsumer(this);
             clearReceiverQueue();
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
         log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}",
@@ -859,6 +860,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 && startMessageId.equals(initialStartMessageId)) ? 
startMessageRollbackDurationInSec : 0;
 
         // synchronized this, because redeliverUnAckMessage eliminate the 
epoch inconsistency between them
+        final CompletableFuture<Void> future = new CompletableFuture<>();
         synchronized (this) {
             setClientCnx(cnx);
             ByteBuf request = Commands.newSubscribe(topic, subscription, 
consumerId, requestId, getSubType(),
@@ -880,6 +882,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                         deregisterFromClientCnx();
                         client.cleanupConsumer(this);
                         cnx.channel().close();
+                        future.complete(null);
                         return;
                     }
                 }
@@ -892,12 +895,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 if (!(firstTimeConnect && hasParentConsumer) && 
getCurrentReceiverQueueSize() != 0) {
                     increaseAvailablePermits(cnx, 
getCurrentReceiverQueueSize());
                 }
+                future.complete(null);
             }).exceptionally((e) -> {
                 deregisterFromClientCnx();
                 if (getState() == State.Closing || getState() == State.Closed) 
{
                     // Consumer was closed while reconnecting, close the 
connection to make sure the broker
                     // drops the consumer on its side
                     cnx.channel().close();
+                    future.complete(null);
                     return null;
                 }
                 log.warn("[{}][{}] Failed to subscribe to topic on {}", topic,
@@ -915,7 +920,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 if (e.getCause() instanceof PulsarClientException
                         && PulsarClientException.isRetriableError(e.getCause())
                         && System.currentTimeMillis() < 
SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
-                    reconnectLater(e.getCause());
+                    future.completeExceptionally(e.getCause());
                 } else if (!subscribeFuture.isDone()) {
                     // unable to create new consumer, fail operation
                     setState(State.Failed);
@@ -939,11 +944,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                             topic, subscription, 
cnx.channel().remoteAddress());
                 } else {
                     // consumer was subscribed and connected but we got some 
error, keep trying
-                    reconnectLater(e.getCause());
+                    future.completeExceptionally(e.getCause());
+                }
+
+                if (!future.isDone()) {
+                    future.complete(null);
                 }
                 return null;
             });
         }
+        return future;
     }
 
     protected void consumerIsReconnectedToBroker(ClientCnx cnx, int 
currentQueueSize) {
@@ -1028,7 +1038,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 setState(State.Failed);
                 if (nonRetriableError) {
                     log.info("[{}] Consumer creation failed for consumer {} 
with unretriableError {}",
-                            topic, consumerId, exception);
+                            topic, consumerId, exception.getMessage());
                 } else {
                     log.info("[{}] Consumer creation failed for consumer {} 
after timeout", topic, consumerId);
                 }
@@ -2639,10 +2649,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         setClientCnx(null);
     }
 
-    void reconnectLater(Throwable exception) {
-        this.connectionHandler.reconnectLater(exception);
-    }
-
     void grabCnx() {
         this.connectionHandler.grabCnx();
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index c382153f4ae..869aad0fc01 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1620,8 +1620,9 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         }
     }
 
+
     @Override
-    public void connectionOpened(final ClientCnx cnx) {
+    public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
         previousExceptions.clear();
         chunkMaxMessageSize = Math.min(chunkMaxMessageSize, 
ClientCnx.getMaxMessageSize());
 
@@ -1630,7 +1631,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             // Because the state could have been updated while retrieving the 
connection, we set it back to connecting,
             // as long as the change from current state to connecting is a 
valid state change.
             if (!changeToConnecting()) {
-                return;
+                return CompletableFuture.completedFuture(null);
             }
             // We set the cnx reference before registering the producer on the 
cnx, so if the cnx breaks before creating
             // the producer, it will try to grab a new cnx. We also increment 
and get the epoch value for the producer.
@@ -1670,6 +1671,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             }
         }
 
+        final CompletableFuture<Void> future = new CompletableFuture<>();
         cnx.sendRequestWithId(
                 Commands.newProducer(topic, producerId, requestId, 
producerName, conf.isEncryptionEnabled(), metadata,
                         schemaInfo, epoch, userProvidedProducerName,
@@ -1684,11 +1686,13 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                     // We are now reconnected to broker and clear to send 
messages. Re-send all pending messages and
                     // set the cnx pointer so that new messages will be sent 
immediately
                     synchronized (ProducerImpl.this) {
-                        if (getState() == State.Closing || getState() == 
State.Closed) {
+                        State state = getState();
+                        if (state == State.Closing || state == State.Closed) {
                             // Producer was closed while reconnecting, close 
the connection to make sure the broker
                             // drops the producer on its side
                             cnx.removeProducer(producerId);
                             cnx.channel().close();
+                            future.complete(null);
                             return;
                         }
                         resetBackoff();
@@ -1715,13 +1719,16 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
                         resendMessages(cnx, epoch);
                     }
+                    future.complete(null);
                 }).exceptionally((e) -> {
                     Throwable cause = e.getCause();
                     cnx.removeProducer(producerId);
-                    if (getState() == State.Closing || getState() == 
State.Closed) {
+                    State state = getState();
+                    if (state == State.Closing || state == State.Closed) {
                         // Producer was closed while reconnecting, close the 
connection to make sure the broker
                         // drops the producer on its side
                         cnx.channel().close();
+                        future.complete(null);
                         return null;
                     }
 
@@ -1750,6 +1757,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                             }
                             producerCreatedFuture.completeExceptionally(cause);
                         });
+                        future.complete(null);
                         return null;
                     }
                     if (cause instanceof 
PulsarClientException.ProducerBlockedQuotaExceededException) {
@@ -1793,7 +1801,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                                 && System.currentTimeMillis() < 
PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this))) {
                         // Either we had already created the producer once 
(producerCreatedFuture.isDone()) or we are
                         // still within the initial timeout budget and we are 
dealing with a retriable error
-                        reconnectLater(cause);
+                        future.completeExceptionally(cause);
                     } else {
                         setState(State.Failed);
                         producerCreatedFuture.completeExceptionally(cause);
@@ -1805,9 +1813,12 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                             sendTimeout = null;
                         }
                     }
-
+                    if (!future.isDone()) {
+                        future.complete(null);
+                    }
                     return null;
                 });
+        return future;
     }
 
     @Override
@@ -1819,7 +1830,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             if (producerCreatedFuture.completeExceptionally(exception)) {
                 if (nonRetriableError) {
                     log.info("[{}] Producer creation failed for producer {} 
with unretriableError = {}",
-                            topic, producerId, exception);
+                            topic, producerId, exception.getMessage());
                 } else {
                     log.info("[{}] Producer creation failed for producer {} 
after producerTimeout", topic, producerId);
                 }
@@ -2334,10 +2345,6 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         this.connectionHandler.setClientCnx(clientCnx);
     }
 
-    void reconnectLater(Throwable exception) {
-        this.connectionHandler.reconnectLater(exception);
-    }
-
     void grabCnx() {
         this.connectionHandler.grabCnx();
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index d2ca018aef9..5ee47c929e4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -89,7 +89,7 @@ public class TopicListWatcher extends HandlerState implements 
ConnectionHandler.
             if (watcherFuture.completeExceptionally(exception)) {
                 setState(State.Failed);
                 log.info("[{}] Watcher creation failed for {} with 
non-retriable error {}",
-                        topic, name, exception);
+                        topic, name, exception.getMessage());
                 deregisterFromClientCnx();
             }
         } else {
@@ -98,13 +98,14 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
     }
 
     @Override
-    public void connectionOpened(ClientCnx cnx) {
+    public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
         previousExceptions.clear();
 
-        if (getState() == State.Closing || getState() == State.Closed) {
+        State state = getState();
+        if (state == State.Closing || state == State.Closed) {
             setState(State.Closed);
             deregisterFromClientCnx();
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
         log.info("[{}][{}] Creating topic list watcher on cnx {}, watcherId 
{}",
@@ -116,6 +117,7 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
                 .compareAndSet(this, 0L, System.currentTimeMillis()
                         + client.getConfiguration().getOperationTimeoutMs());
 
+        final CompletableFuture<Void> future = new CompletableFuture<>();
         // synchronized this, because redeliverUnAckMessage eliminate the 
epoch inconsistency between them
         synchronized (this) {
             setClientCnx(cnx);
@@ -132,6 +134,7 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
                                 setState(State.Closed);
                                 deregisterFromClientCnx();
                                 cnx.channel().close();
+                                future.complete(null);
                                 return;
                             }
                         }
@@ -139,13 +142,14 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
                         this.connectionHandler.resetBackoff();
 
                         watcherFuture.complete(this);
-
+                        future.complete(null);
                     }).exceptionally((e) -> {
                         deregisterFromClientCnx();
                         if (getState() == State.Closing || getState() == 
State.Closed) {
                             // Watcher was closed while reconnecting, close 
the connection to make sure the broker
                             // drops the watcher on its side
                             cnx.channel().close();
+                            future.complete(null);
                             return null;
                         }
                         log.warn("[{}][{}] Failed to subscribe to topic on 
{}", topic,
@@ -155,7 +159,7 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
                                 && 
PulsarClientException.isRetriableError(e.getCause())
                                 && System.currentTimeMillis()
                                     < 
CREATE_WATCHER_DEADLINE_UPDATER.get(TopicListWatcher.this)) {
-                            reconnectLater(e.getCause());
+                            future.completeExceptionally(e.getCause());
                         } else if (!watcherFuture.isDone()) {
                             // unable to create new watcher, fail operation
                             setState(State.Failed);
@@ -164,11 +168,15 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
                                                     + "when connecting to the 
broker", getHandlerName())));
                         } else {
                             // watcher was subscribed and connected, but we 
got some error, keep trying
-                            reconnectLater(e.getCause());
+                            future.completeExceptionally(e.getCause());
+                        }
+                        if (!future.isDone()) {
+                            future.complete(null);
                         }
                         return null;
                     });
         }
+        return future;
     }
 
     @Override
@@ -249,11 +257,6 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
         setClientCnx(null);
     }
 
-    void reconnectLater(Throwable exception) {
-        this.connectionHandler.reconnectLater(exception);
-    }
-
-
     private void cleanupAtClose(CompletableFuture<Void> closeFuture, Throwable 
exception) {
         log.info("[{}] Closed topic list watcher", getHandlerName());
         setState(State.Closed);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 15533cfd2a9..3d44b789135 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -123,14 +123,17 @@ public class TransactionMetaStoreHandler extends 
HandlerState
     }
 
     @Override
-    public void connectionOpened(ClientCnx cnx) {
+    public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
         internalPinnedExecutor.execute(() -> {
             LOG.info("Transaction meta handler with transaction coordinator id 
{} connection opened.",
                     transactionCoordinatorId);
 
-            if (getState() == State.Closing || getState() == State.Closed) {
+            State state = getState();
+            if (state == State.Closing || state == State.Closed) {
                 setState(State.Closed);
                 failPendingRequest();
+                future.complete(null);
                 return;
             }
 
@@ -146,6 +149,7 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                             this.connectionHandler.resetBackoff();
                             pendingRequests.forEach((requestID, opBase) -> 
checkStateAndSendRequest(opBase));
                         }
+                        future.complete(null);
                     });
                 }).exceptionally((e) -> {
                     internalPinnedExecutor.execute(() -> {
@@ -155,16 +159,19 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                                 || e.getCause() instanceof 
PulsarClientException.NotAllowedException) {
                             setState(State.Closed);
                             cnx.channel().close();
+                            future.complete(null);
                         } else {
-                            connectionHandler.reconnectLater(e.getCause());
+                            future.completeExceptionally(e.getCause());
                         }
                     });
                     return null;
                 });
             } else {
                 registerToConnection(cnx);
+                future.complete(null);
             }
         });
+        return future;
     }
 
     private boolean registerToConnection(ClientCnx cnx) {

Reply via email to