This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new d5631c961c1 [fix][client] Make the whole grabCnx() progress atomic 
(#20595)
d5631c961c1 is described below

commit d5631c961c1a026d7deef2c5405b1456f0dbfd77
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Jun 30 16:54:17 2023 +0800

    [fix][client] Make the whole grabCnx() progress atomic (#20595)
    
    (cherry picked from commit 2bede012c73c301b73c079aaeb122cbb472728c1)
---
 .../pulsar/client/impl/ConnectionHandlerTest.java  | 154 +++++++++++++++++++++
 .../pulsar/client/impl/ConnectionHandler.java      |  49 ++++---
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  26 ++--
 .../apache/pulsar/client/impl/ProducerImpl.java    |  29 ++--
 .../client/impl/TransactionMetaStoreHandler.java   |  13 +-
 5 files changed, 231 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
new file mode 100644
index 00000000000..d93d87c9137
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class ConnectionHandlerTest extends ProducerConsumerBase {
+
+    private static final Backoff BACKOFF = new 
BackoffBuilder().setInitialTime(1, TimeUnit.MILLISECONDS)
+            .setMandatoryStop(1, TimeUnit.SECONDS)
+            .setMax(3, TimeUnit.SECONDS).create();
+    private final ExecutorService executor = Executors.newFixedThreadPool(4);
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        executor.shutdown();
+    }
+
+    @Test(timeOut = 30000)
+    public void testSynchronousGrabCnx() {
+        for (int i = 0; i < 10; i++) {
+            final CompletableFuture<Integer> future = new 
CompletableFuture<>();
+            final int index = i;
+            final ConnectionHandler handler = new ConnectionHandler(
+                    new MockedHandlerState((PulsarClientImpl) pulsarClient, 
"my-topic"), BACKOFF,
+                    cnx -> {
+                        future.complete(index);
+                        return CompletableFuture.completedFuture(null);
+                    });
+            handler.grabCnx();
+            Assert.assertEquals(future.join().intValue(), i);
+        }
+    }
+
+    @Test
+    public void testConcurrentGrabCnx() {
+        final AtomicInteger cnt = new AtomicInteger(0);
+        final ConnectionHandler handler = new ConnectionHandler(
+                new MockedHandlerState((PulsarClientImpl) pulsarClient, 
"my-topic"), BACKOFF,
+                cnx -> {
+                    cnt.incrementAndGet();
+                    return CompletableFuture.completedFuture(null);
+                });
+        final int numGrab = 10;
+        for (int i = 0; i < numGrab; i++) {
+            handler.grabCnx();
+        }
+        Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> cnt.get() 
> 0);
+        Assert.assertThrows(ConditionTimeoutException.class,
+                () -> 
Awaitility.await().atMost(Duration.ofMillis(500)).until(() -> cnt.get() == 
numGrab));
+        Assert.assertEquals(cnt.get(), 1);
+    }
+
+    @Test
+    public void testDuringConnectInvokeCount() throws IllegalAccessException {
+        // 1. connectionOpened completes with null
+        final AtomicBoolean duringConnect = spy(new AtomicBoolean());
+        final ConnectionHandler handler1 = new ConnectionHandler(
+                new MockedHandlerState((PulsarClientImpl) pulsarClient, 
"my-topic"), BACKOFF,
+                cnx -> CompletableFuture.completedFuture(null));
+        FieldUtils.writeField(handler1, "duringConnect", duringConnect, true);
+        handler1.grabCnx();
+        Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> 
!duringConnect.get());
+        verify(duringConnect, times(1)).compareAndSet(false, true);
+        verify(duringConnect, times(1)).set(false);
+
+        // 2. connectionFailed is called
+        final ConnectionHandler handler2 = new ConnectionHandler(
+                new MockedHandlerState((PulsarClientImpl) pulsarClient, null), 
new MockedBackoff(),
+                cnx -> CompletableFuture.completedFuture(null));
+        FieldUtils.writeField(handler2, "duringConnect", duringConnect, true);
+        handler2.grabCnx();
+        Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> 
!duringConnect.get());
+        verify(duringConnect, times(2)).compareAndSet(false, true);
+        verify(duringConnect, times(2)).set(false);
+
+        // 3. connectionOpened completes exceptionally
+        final ConnectionHandler handler3 = new ConnectionHandler(
+                new MockedHandlerState((PulsarClientImpl) pulsarClient, 
"my-topic"), new MockedBackoff(),
+                cnx -> FutureUtil.failedFuture(new RuntimeException("fail")));
+        FieldUtils.writeField(handler3, "duringConnect", duringConnect, true);
+        handler3.grabCnx();
+        Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> 
!duringConnect.get());
+        verify(duringConnect, times(3)).compareAndSet(false, true);
+        verify(duringConnect, times(3)).set(false);
+    }
+
+    private static class MockedHandlerState extends HandlerState {
+
+        public MockedHandlerState(PulsarClientImpl client, String topic) {
+            super(client, topic);
+        }
+
+        @Override
+        String getHandlerName() {
+            return "mocked";
+        }
+    }
+
+    private static class MockedBackoff extends Backoff {
+
+        // Set a large backoff so that reconnection won't happen in tests
+        public MockedBackoff() {
+            super(1, TimeUnit.HOURS, 2, TimeUnit.HOURS, 1, TimeUnit.HOURS);
+        }
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index a951d7b2cb8..1d9c28d90ea 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.client.impl;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -39,10 +41,16 @@ public class ConnectionHandler {
     // Start with -1L because it gets incremented before sending on the first 
connection
     private volatile long epoch = -1L;
     protected volatile long lastConnectionClosedTimestamp = 0L;
+    private final AtomicBoolean duringConnect = new AtomicBoolean(false);
 
     interface Connection {
-        void connectionFailed(PulsarClientException exception);
-        void connectionOpened(ClientCnx cnx);
+
+        /**
+         * @apiNote If the returned future is completed exceptionally, 
reconnectLater will be called.
+         */
+        CompletableFuture<Void> connectionOpened(ClientCnx cnx);
+        default void connectionFailed(PulsarClientException e) {
+        }
     }
 
     protected Connection connection;
@@ -67,10 +75,16 @@ public class ConnectionHandler {
                     state.topic, state.getHandlerName(), state.getState());
             return;
         }
+        if (!duringConnect.compareAndSet(false, true)) {
+            log.info("[{}] [{}] Skip grabbing the connection since there is a 
pending connection",
+                    state.topic, state.getHandlerName());
+            return;
+        }
 
         try {
             state.client.getConnection(state.topic) //
                     .thenAccept(cnx -> connection.connectionOpened(cnx)) //
+                    .thenAccept(__ -> duringConnect.set(false))
                     .exceptionally(this::handleConnectionError);
         } catch (Throwable t) {
             log.warn("[{}] [{}] Exception thrown while getting connection: ", 
state.topic, state.getHandlerName(), t);
@@ -79,25 +93,27 @@ public class ConnectionHandler {
     }
 
     private Void handleConnectionError(Throwable exception) {
-        log.warn("[{}] [{}] Error connecting to broker: {}",
-                state.topic, state.getHandlerName(), exception.getMessage());
-        if (exception instanceof PulsarClientException) {
-            connection.connectionFailed((PulsarClientException) exception);
-        } else if (exception.getCause() instanceof  PulsarClientException) {
-            connection.connectionFailed((PulsarClientException) 
exception.getCause());
-        } else {
-            connection.connectionFailed(new PulsarClientException(exception));
-        }
-
-        State state = this.state.getState();
-        if (state == State.Uninitialized || state == State.Connecting || state 
== State.Ready) {
-            reconnectLater(exception);
+        try {
+            log.warn("[{}] [{}] Error connecting to broker: {}",
+                    state.topic, state.getHandlerName(), 
exception.getMessage());
+            if (exception instanceof PulsarClientException) {
+                connection.connectionFailed((PulsarClientException) exception);
+            } else if (exception.getCause() instanceof PulsarClientException) {
+                connection.connectionFailed((PulsarClientException) 
exception.getCause());
+            } else {
+                connection.connectionFailed(new 
PulsarClientException(exception));
+            }
+        } catch (Throwable throwable) {
+            log.error("[{}] [{}] Unexpected exception after the connection",
+                    state.topic, state.getHandlerName(), throwable);
         }
 
+        reconnectLater(exception);
         return null;
     }
 
-    protected void reconnectLater(Throwable exception) {
+    void reconnectLater(Throwable exception) {
+        duringConnect.set(false);
         CLIENT_CNX_UPDATER.set(this, null);
         if (!isValidStateForReconnection()) {
             log.info("[{}] [{}] Ignoring reconnection request (state: {})",
@@ -121,6 +137,7 @@ public class ConnectionHandler {
 
     public void connectionClosed(ClientCnx cnx) {
         lastConnectionClosedTimestamp = System.currentTimeMillis();
+        duringConnect.set(false);
         state.client.getCnxPool().releaseConnection(cnx);
         if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
             if (!isValidStateForReconnection()) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 453fd528432..4a7fef11af4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -765,16 +765,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     @Override
-    public void connectionOpened(final ClientCnx cnx) {
+    public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
         previousExceptions.clear();
 
-        if (getState() == State.Closing || getState() == State.Closed) {
+        final State state = getState();
+        if (state == State.Closing || state == State.Closed) {
             setState(State.Closed);
             closeConsumerTasks();
             deregisterFromClientCnx();
             client.cleanupConsumer(this);
             clearReceiverQueue();
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
         log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}",
@@ -823,6 +824,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 && startMessageId.equals(initialStartMessageId)) ? 
startMessageRollbackDurationInSec : 0;
 
         // synchronized this, because redeliverUnAckMessage eliminate the 
epoch inconsistency between them
+        final CompletableFuture<Void> future = new CompletableFuture<>();
         synchronized (this) {
             setClientCnx(cnx);
             ByteBuf request = Commands.newSubscribe(topic, subscription, 
consumerId, requestId, getSubType(),
@@ -844,6 +846,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                         deregisterFromClientCnx();
                         client.cleanupConsumer(this);
                         cnx.channel().close();
+                        future.complete(null);
                         return;
                     }
                 }
@@ -856,12 +859,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 if (!(firstTimeConnect && hasParentConsumer) && 
conf.getReceiverQueueSize() != 0) {
                     increaseAvailablePermits(cnx, conf.getReceiverQueueSize());
                 }
+                future.complete(null);
             }).exceptionally((e) -> {
                 deregisterFromClientCnx();
                 if (getState() == State.Closing || getState() == State.Closed) 
{
                     // Consumer was closed while reconnecting, close the 
connection to make sure the broker
                     // drops the consumer on its side
                     cnx.channel().close();
+                    future.complete(null);
                     return null;
                 }
                 log.warn("[{}][{}] Failed to subscribe to topic on {}", topic,
@@ -879,7 +884,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 if (e.getCause() instanceof PulsarClientException
                         && PulsarClientException.isRetriableError(e.getCause())
                         && System.currentTimeMillis() < 
SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
-                    reconnectLater(e.getCause());
+                    future.completeExceptionally(e.getCause());
                 } else if (!subscribeFuture.isDone()) {
                     // unable to create new consumer, fail operation
                     setState(State.Failed);
@@ -903,11 +908,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                             topic, subscription, 
cnx.channel().remoteAddress());
                 } else {
                     // consumer was subscribed and connected but we got some 
error, keep trying
-                    reconnectLater(e.getCause());
+                    future.completeExceptionally(e.getCause());
+                }
+
+                if (!future.isDone()) {
+                    future.complete(null);
                 }
                 return null;
             });
         }
+        return future;
     }
 
     protected void consumerIsReconnectedToBroker(ClientCnx cnx, int 
currentQueueSize) {
@@ -992,7 +1002,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 setState(State.Failed);
                 if (nonRetriableError) {
                     log.info("[{}] Consumer creation failed for consumer {} 
with unretriableError {}",
-                            topic, consumerId, exception);
+                            topic, consumerId, exception.getMessage());
                 } else {
                     log.info("[{}] Consumer creation failed for consumer {} 
after timeout", topic, consumerId);
                 }
@@ -2580,10 +2590,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         setClientCnx(null);
     }
 
-    void reconnectLater(Throwable exception) {
-        this.connectionHandler.reconnectLater(exception);
-    }
-
     void grabCnx() {
         this.connectionHandler.grabCnx();
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index f2334650ad8..a8ef3236f56 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1536,8 +1536,9 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         }
     }
 
+
     @Override
-    public void connectionOpened(final ClientCnx cnx) {
+    public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
         previousExceptions.clear();
 
         final long epoch;
@@ -1545,7 +1546,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             // Because the state could have been updated while retrieving the 
connection, we set it back to connecting,
             // as long as the change from current state to connecting is a 
valid state change.
             if (!changeToConnecting()) {
-                return;
+                return CompletableFuture.completedFuture(null);
             }
             // We set the cnx reference before registering the producer on the 
cnx, so if the cnx breaks before creating
             // the producer, it will try to grab a new cnx. We also increment 
and get the epoch value for the producer.
@@ -1585,6 +1586,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             }
         }
 
+        final CompletableFuture<Void> future = new CompletableFuture<>();
         cnx.sendRequestWithId(
                 Commands.newProducer(topic, producerId, requestId, 
producerName, conf.isEncryptionEnabled(), metadata,
                         schemaInfo, epoch, userProvidedProducerName,
@@ -1599,11 +1601,13 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                     // We are now reconnected to broker and clear to send 
messages. Re-send all pending messages and
                     // set the cnx pointer so that new messages will be sent 
immediately
                     synchronized (ProducerImpl.this) {
-                        if (getState() == State.Closing || getState() == 
State.Closed) {
+                        State state = getState();
+                        if (state == State.Closing || state == State.Closed) {
                             // Producer was closed while reconnecting, close 
the connection to make sure the broker
                             // drops the producer on its side
                             cnx.removeProducer(producerId);
                             cnx.channel().close();
+                            future.complete(null);
                             return;
                         }
                         resetBackoff();
@@ -1653,13 +1657,16 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                         }
                         resendMessages(cnx, epoch);
                     }
+                    future.complete(null);
                 }).exceptionally((e) -> {
                     Throwable cause = e.getCause();
                     cnx.removeProducer(producerId);
-                    if (getState() == State.Closing || getState() == 
State.Closed) {
+                    State state = getState();
+                    if (state == State.Closing || state == State.Closed) {
                         // Producer was closed while reconnecting, close the 
connection to make sure the broker
                         // drops the producer on its side
                         cnx.channel().close();
+                        future.complete(null);
                         return null;
                     }
 
@@ -1688,6 +1695,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                             }
                             producerCreatedFuture.completeExceptionally(cause);
                         });
+                        future.complete(null);
                         return null;
                     }
                     if (cause instanceof 
PulsarClientException.ProducerBlockedQuotaExceededException) {
@@ -1731,7 +1739,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                                 && System.currentTimeMillis() < 
PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this))) {
                         // Either we had already created the producer once 
(producerCreatedFuture.isDone()) or we are
                         // still within the initial timeout budget and we are 
dealing with a retriable error
-                        reconnectLater(cause);
+                        future.completeExceptionally(cause);
                     } else {
                         setState(State.Failed);
                         producerCreatedFuture.completeExceptionally(cause);
@@ -1743,9 +1751,12 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                             sendTimeout = null;
                         }
                     }
-
+                    if (!future.isDone()) {
+                        future.complete(null);
+                    }
                     return null;
                 });
+        return future;
     }
 
     @Override
@@ -1757,7 +1768,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             if (producerCreatedFuture.completeExceptionally(exception)) {
                 if (nonRetriableError) {
                     log.info("[{}] Producer creation failed for producer {} 
with unretriableError = {}",
-                            topic, producerId, exception);
+                            topic, producerId, exception.getMessage());
                 } else {
                     log.info("[{}] Producer creation failed for producer {} 
after producerTimeout", topic, producerId);
                 }
@@ -2183,10 +2194,6 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         this.connectionHandler.setClientCnx(clientCnx);
     }
 
-    void reconnectLater(Throwable exception) {
-        this.connectionHandler.reconnectLater(exception);
-    }
-
     void grabCnx() {
         this.connectionHandler.grabCnx();
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index baff5deb3f1..56443709bf3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -123,14 +123,17 @@ public class TransactionMetaStoreHandler extends 
HandlerState
     }
 
     @Override
-    public void connectionOpened(ClientCnx cnx) {
+    public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
         internalPinnedExecutor.execute(() -> {
             LOG.info("Transaction meta handler with transaction coordinator id 
{} connection opened.",
                     transactionCoordinatorId);
 
-            if (getState() == State.Closing || getState() == State.Closed) {
+            State state = getState();
+            if (state == State.Closing || state == State.Closed) {
                 setState(State.Closed);
                 failPendingRequest();
+                future.complete(null);
                 return;
             }
 
@@ -146,6 +149,7 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                             this.connectionHandler.resetBackoff();
                             pendingRequests.forEach((requestID, opBase) -> 
checkStateAndSendRequest(opBase));
                         }
+                        future.complete(null);
                     });
                 }).exceptionally((e) -> {
                     internalPinnedExecutor.execute(() -> {
@@ -155,16 +159,19 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                                 || e.getCause() instanceof 
PulsarClientException.NotAllowedException) {
                             setState(State.Closed);
                             cnx.channel().close();
+                            future.complete(null);
                         } else {
-                            connectionHandler.reconnectLater(e.getCause());
+                            future.completeExceptionally(e.getCause());
                         }
                     });
                     return null;
                 });
             } else {
                 registerToConnection(cnx);
+                future.complete(null);
             }
         });
+        return future;
     }
 
     private boolean registerToConnection(ClientCnx cnx) {

Reply via email to