This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2bede012c73 [fix][client] Make the whole grabCnx() progress atomic
(#20595)
2bede012c73 is described below
commit 2bede012c73c301b73c079aaeb122cbb472728c1
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Jun 30 16:54:17 2023 +0800
[fix][client] Make the whole grabCnx() progress atomic (#20595)
---
.../pulsar/client/impl/ConnectionHandlerTest.java | 154 +++++++++++++++++++++
.../pulsar/client/impl/ConnectionHandler.java | 50 ++++---
.../apache/pulsar/client/impl/ConsumerImpl.java | 26 ++--
.../apache/pulsar/client/impl/ProducerImpl.java | 29 ++--
.../pulsar/client/impl/TopicListWatcher.java | 27 ++--
.../client/impl/TransactionMetaStoreHandler.java | 13 +-
6 files changed, 246 insertions(+), 53 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
new file mode 100644
index 00000000000..f29d62db5f4
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class ConnectionHandlerTest extends ProducerConsumerBase {
+
+ private static final Backoff BACKOFF = new
BackoffBuilder().setInitialTime(1, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(1, TimeUnit.SECONDS)
+ .setMax(3, TimeUnit.SECONDS).create();
+ private final ExecutorService executor = Executors.newFixedThreadPool(4);
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ executor.shutdown();
+ }
+
+ @Test(timeOut = 30000)
+ public void testSynchronousGrabCnx() {
+ for (int i = 0; i < 10; i++) {
+ final CompletableFuture<Integer> future = new
CompletableFuture<>();
+ final int index = i;
+ final ConnectionHandler handler = new ConnectionHandler(
+ new MockedHandlerState((PulsarClientImpl) pulsarClient,
"my-topic"), BACKOFF,
+ cnx -> {
+ future.complete(index);
+ return CompletableFuture.completedFuture(null);
+ });
+ handler.grabCnx();
+ Assert.assertEquals(future.join(), i);
+ }
+ }
+
+ @Test
+ public void testConcurrentGrabCnx() {
+ final AtomicInteger cnt = new AtomicInteger(0);
+ final ConnectionHandler handler = new ConnectionHandler(
+ new MockedHandlerState((PulsarClientImpl) pulsarClient,
"my-topic"), BACKOFF,
+ cnx -> {
+ cnt.incrementAndGet();
+ return CompletableFuture.completedFuture(null);
+ });
+ final int numGrab = 10;
+ for (int i = 0; i < numGrab; i++) {
+ handler.grabCnx();
+ }
+ Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> cnt.get()
> 0);
+ Assert.assertThrows(ConditionTimeoutException.class,
+ () ->
Awaitility.await().atMost(Duration.ofMillis(500)).until(() -> cnt.get() ==
numGrab));
+ Assert.assertEquals(cnt.get(), 1);
+ }
+
+ @Test
+ public void testDuringConnectInvokeCount() throws IllegalAccessException {
+ // 1. connectionOpened completes with null
+ final AtomicBoolean duringConnect = spy(new AtomicBoolean());
+ final ConnectionHandler handler1 = new ConnectionHandler(
+ new MockedHandlerState((PulsarClientImpl) pulsarClient,
"my-topic"), BACKOFF,
+ cnx -> CompletableFuture.completedFuture(null));
+ FieldUtils.writeField(handler1, "duringConnect", duringConnect, true);
+ handler1.grabCnx();
+ Awaitility.await().atMost(Duration.ofSeconds(3)).until(() ->
!duringConnect.get());
+ verify(duringConnect, times(1)).compareAndSet(false, true);
+ verify(duringConnect, times(1)).set(false);
+
+ // 2. connectionFailed is called
+ final ConnectionHandler handler2 = new ConnectionHandler(
+ new MockedHandlerState((PulsarClientImpl) pulsarClient, null),
new MockedBackoff(),
+ cnx -> CompletableFuture.completedFuture(null));
+ FieldUtils.writeField(handler2, "duringConnect", duringConnect, true);
+ handler2.grabCnx();
+ Awaitility.await().atMost(Duration.ofSeconds(3)).until(() ->
!duringConnect.get());
+ verify(duringConnect, times(2)).compareAndSet(false, true);
+ verify(duringConnect, times(2)).set(false);
+
+ // 3. connectionOpened completes exceptionally
+ final ConnectionHandler handler3 = new ConnectionHandler(
+ new MockedHandlerState((PulsarClientImpl) pulsarClient,
"my-topic"), new MockedBackoff(),
+ cnx -> FutureUtil.failedFuture(new RuntimeException("fail")));
+ FieldUtils.writeField(handler3, "duringConnect", duringConnect, true);
+ handler3.grabCnx();
+ Awaitility.await().atMost(Duration.ofSeconds(3)).until(() ->
!duringConnect.get());
+ verify(duringConnect, times(3)).compareAndSet(false, true);
+ verify(duringConnect, times(3)).set(false);
+ }
+
+ private static class MockedHandlerState extends HandlerState {
+
+ public MockedHandlerState(PulsarClientImpl client, String topic) {
+ super(client, topic);
+ }
+
+ @Override
+ String getHandlerName() {
+ return "mocked";
+ }
+ }
+
+ private static class MockedBackoff extends Backoff {
+
+ // Set a large backoff so that reconnection won't happen in tests
+ public MockedBackoff() {
+ super(1, TimeUnit.HOURS, 2, TimeUnit.HOURS, 1, TimeUnit.HOURS);
+ }
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 046cb90643a..365abce3b90 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -41,10 +42,16 @@ public class ConnectionHandler {
// Start with -1L because it gets incremented before sending on the first
connection
private volatile long epoch = -1L;
protected volatile long lastConnectionClosedTimestamp = 0L;
+ private final AtomicBoolean duringConnect = new AtomicBoolean(false);
interface Connection {
- void connectionFailed(PulsarClientException exception);
- void connectionOpened(ClientCnx cnx);
+
+ /**
+ * @apiNote If the returned future is completed exceptionally,
reconnectLater will be called.
+ */
+ CompletableFuture<Void> connectionOpened(ClientCnx cnx);
+ default void connectionFailed(PulsarClientException e) {
+ }
}
protected Connection connection;
@@ -69,6 +76,11 @@ public class ConnectionHandler {
state.topic, state.getHandlerName(), state.getState());
return;
}
+ if (!duringConnect.compareAndSet(false, true)) {
+ log.info("[{}] [{}] Skip grabbing the connection since there is a
pending connection",
+ state.topic, state.getHandlerName());
+ return;
+ }
try {
CompletableFuture<ClientCnx> cnxFuture;
@@ -81,7 +93,8 @@ public class ConnectionHandler {
} else {
cnxFuture = state.client.getConnection(state.topic); //
}
- cnxFuture.thenAccept(cnx -> connection.connectionOpened(cnx)) //
+ cnxFuture.thenCompose(cnx -> connection.connectionOpened(cnx))
+ .thenAccept(__ -> duringConnect.set(false))
.exceptionally(this::handleConnectionError);
} catch (Throwable t) {
log.warn("[{}] [{}] Exception thrown while getting connection: ",
state.topic, state.getHandlerName(), t);
@@ -90,25 +103,27 @@ public class ConnectionHandler {
}
private Void handleConnectionError(Throwable exception) {
- log.warn("[{}] [{}] Error connecting to broker: {}",
- state.topic, state.getHandlerName(), exception.getMessage());
- if (exception instanceof PulsarClientException) {
- connection.connectionFailed((PulsarClientException) exception);
- } else if (exception.getCause() instanceof PulsarClientException) {
- connection.connectionFailed((PulsarClientException)
exception.getCause());
- } else {
- connection.connectionFailed(new PulsarClientException(exception));
- }
-
- State state = this.state.getState();
- if (state == State.Uninitialized || state == State.Connecting || state
== State.Ready) {
- reconnectLater(exception);
+ try {
+ log.warn("[{}] [{}] Error connecting to broker: {}",
+ state.topic, state.getHandlerName(),
exception.getMessage());
+ if (exception instanceof PulsarClientException) {
+ connection.connectionFailed((PulsarClientException) exception);
+ } else if (exception.getCause() instanceof PulsarClientException) {
+ connection.connectionFailed((PulsarClientException)
exception.getCause());
+ } else {
+ connection.connectionFailed(new
PulsarClientException(exception));
+ }
+ } catch (Throwable throwable) {
+ log.error("[{}] [{}] Unexpected exception after the connection",
+ state.topic, state.getHandlerName(), throwable);
}
+ reconnectLater(exception);
return null;
}
- protected void reconnectLater(Throwable exception) {
+ void reconnectLater(Throwable exception) {
+ duringConnect.set(false);
CLIENT_CNX_UPDATER.set(this, null);
if (!isValidStateForReconnection()) {
log.info("[{}] [{}] Ignoring reconnection request (state: {})",
@@ -132,6 +147,7 @@ public class ConnectionHandler {
public void connectionClosed(ClientCnx cnx) {
lastConnectionClosedTimestamp = System.currentTimeMillis();
+ duringConnect.set(false);
state.client.getCnxPool().releaseConnection(cnx);
if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
if (!isValidStateForReconnection()) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 4a84e765065..b0d7d3a0f8b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -760,16 +760,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
@Override
- public void connectionOpened(final ClientCnx cnx) {
+ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
previousExceptions.clear();
- if (getState() == State.Closing || getState() == State.Closed) {
+ final State state = getState();
+ if (state == State.Closing || state == State.Closed) {
setState(State.Closed);
closeConsumerTasks();
deregisterFromClientCnx();
client.cleanupConsumer(this);
clearReceiverQueue();
- return;
+ return CompletableFuture.completedFuture(null);
}
log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}",
@@ -823,6 +824,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
&& startMessageId.equals(initialStartMessageId)) ?
startMessageRollbackDurationInSec : 0;
// synchronized this, because redeliverUnAckMessage eliminate the
epoch inconsistency between them
+ final CompletableFuture<Void> future = new CompletableFuture<>();
synchronized (this) {
setClientCnx(cnx);
ByteBuf request = Commands.newSubscribe(topic, subscription,
consumerId, requestId, getSubType(),
@@ -844,6 +846,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
deregisterFromClientCnx();
client.cleanupConsumer(this);
cnx.channel().close();
+ future.complete(null);
return;
}
}
@@ -856,12 +859,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (!(firstTimeConnect && hasParentConsumer) &&
getCurrentReceiverQueueSize() != 0) {
increaseAvailablePermits(cnx,
getCurrentReceiverQueueSize());
}
+ future.complete(null);
}).exceptionally((e) -> {
deregisterFromClientCnx();
if (getState() == State.Closing || getState() == State.Closed)
{
// Consumer was closed while reconnecting, close the
connection to make sure the broker
// drops the consumer on its side
cnx.channel().close();
+ future.complete(null);
return null;
}
log.warn("[{}][{}] Failed to subscribe to topic on {}", topic,
@@ -879,7 +884,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (e.getCause() instanceof PulsarClientException
&& PulsarClientException.isRetriableError(e.getCause())
&& System.currentTimeMillis() <
SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
- reconnectLater(e.getCause());
+ future.completeExceptionally(e.getCause());
} else if (!subscribeFuture.isDone()) {
// unable to create new consumer, fail operation
setState(State.Failed);
@@ -903,11 +908,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
topic, subscription,
cnx.channel().remoteAddress());
} else {
// consumer was subscribed and connected but we got some
error, keep trying
- reconnectLater(e.getCause());
+ future.completeExceptionally(e.getCause());
+ }
+
+ if (!future.isDone()) {
+ future.complete(null);
}
return null;
});
}
+ return future;
}
protected void consumerIsReconnectedToBroker(ClientCnx cnx, int
currentQueueSize) {
@@ -991,7 +1001,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
setState(State.Failed);
if (nonRetriableError) {
log.info("[{}] Consumer creation failed for consumer {}
with unretriableError {}",
- topic, consumerId, exception);
+ topic, consumerId, exception.getMessage());
} else {
log.info("[{}] Consumer creation failed for consumer {}
after timeout", topic, consumerId);
}
@@ -2590,10 +2600,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
setClientCnx(null);
}
- void reconnectLater(Throwable exception) {
- this.connectionHandler.reconnectLater(exception);
- }
-
void grabCnx() {
this.connectionHandler.grabCnx();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 2192ebfb64e..267b06649d7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1649,8 +1649,9 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
}
+
@Override
- public void connectionOpened(final ClientCnx cnx) {
+ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
previousExceptions.clear();
chunkMaxMessageSize = Math.min(chunkMaxMessageSize,
ClientCnx.getMaxMessageSize());
@@ -1659,7 +1660,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
// Because the state could have been updated while retrieving the
connection, we set it back to connecting,
// as long as the change from current state to connecting is a
valid state change.
if (!changeToConnecting()) {
- return;
+ return CompletableFuture.completedFuture(null);
}
// We set the cnx reference before registering the producer on the
cnx, so if the cnx breaks before creating
// the producer, it will try to grab a new cnx. We also increment
and get the epoch value for the producer.
@@ -1699,6 +1700,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
}
+ final CompletableFuture<Void> future = new CompletableFuture<>();
cnx.sendRequestWithId(
Commands.newProducer(topic, producerId, requestId,
producerName, conf.isEncryptionEnabled(), metadata,
schemaInfo, epoch, userProvidedProducerName,
@@ -1713,11 +1715,13 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
// We are now reconnected to broker and clear to send
messages. Re-send all pending messages and
// set the cnx pointer so that new messages will be sent
immediately
synchronized (ProducerImpl.this) {
- if (getState() == State.Closing || getState() ==
State.Closed) {
+ State state = getState();
+ if (state == State.Closing || state == State.Closed) {
// Producer was closed while reconnecting, close
the connection to make sure the broker
// drops the producer on its side
cnx.removeProducer(producerId);
cnx.channel().close();
+ future.complete(null);
return;
}
resetBackoff();
@@ -1744,13 +1748,16 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
resendMessages(cnx, epoch);
}
+ future.complete(null);
}).exceptionally((e) -> {
Throwable cause = e.getCause();
cnx.removeProducer(producerId);
- if (getState() == State.Closing || getState() ==
State.Closed) {
+ State state = getState();
+ if (state == State.Closing || state == State.Closed) {
// Producer was closed while reconnecting, close the
connection to make sure the broker
// drops the producer on its side
cnx.channel().close();
+ future.complete(null);
return null;
}
@@ -1779,6 +1786,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
producerCreatedFuture.completeExceptionally(cause);
});
+ future.complete(null);
return null;
}
if (cause instanceof
PulsarClientException.ProducerBlockedQuotaExceededException) {
@@ -1822,7 +1830,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
&& System.currentTimeMillis() <
PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this))) {
// Either we had already created the producer once
(producerCreatedFuture.isDone()) or we are
// still within the initial timeout budget and we are
dealing with a retriable error
- reconnectLater(cause);
+ future.completeExceptionally(cause);
} else {
setState(State.Failed);
producerCreatedFuture.completeExceptionally(cause);
@@ -1834,9 +1842,12 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
sendTimeout = null;
}
}
-
+ if (!future.isDone()) {
+ future.complete(null);
+ }
return null;
});
+ return future;
}
@Override
@@ -1848,7 +1859,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
if (producerCreatedFuture.completeExceptionally(exception)) {
if (nonRetriableError) {
log.info("[{}] Producer creation failed for producer {}
with unretriableError = {}",
- topic, producerId, exception);
+ topic, producerId, exception.getMessage());
} else {
log.info("[{}] Producer creation failed for producer {}
after producerTimeout", topic, producerId);
}
@@ -2364,10 +2375,6 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
this.connectionHandler.setClientCnx(clientCnx);
}
- void reconnectLater(Throwable exception) {
- this.connectionHandler.reconnectLater(exception);
- }
-
void grabCnx() {
this.connectionHandler.grabCnx();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index 384d1b688b8..8fe1e9bbf0f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -89,7 +89,7 @@ public class TopicListWatcher extends HandlerState implements
ConnectionHandler.
if (watcherFuture.completeExceptionally(exception)) {
setState(State.Failed);
log.info("[{}] Watcher creation failed for {} with
non-retriable error {}",
- topic, name, exception);
+ topic, name, exception.getMessage());
deregisterFromClientCnx();
}
} else {
@@ -98,13 +98,14 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
}
@Override
- public void connectionOpened(ClientCnx cnx) {
+ public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
previousExceptions.clear();
- if (getState() == State.Closing || getState() == State.Closed) {
+ State state = getState();
+ if (state == State.Closing || state == State.Closed) {
setState(State.Closed);
deregisterFromClientCnx();
- return;
+ return CompletableFuture.completedFuture(null);
}
log.info("[{}][{}] Creating topic list watcher on cnx {}, watcherId
{}",
@@ -116,6 +117,7 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
.compareAndSet(this, 0L, System.currentTimeMillis()
+ client.getConfiguration().getOperationTimeoutMs());
+ final CompletableFuture<Void> future = new CompletableFuture<>();
// synchronized this, because redeliverUnAckMessage eliminate the
epoch inconsistency between them
synchronized (this) {
setClientCnx(cnx);
@@ -132,6 +134,7 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
setState(State.Closed);
deregisterFromClientCnx();
cnx.channel().close();
+ future.complete(null);
return;
}
}
@@ -139,13 +142,14 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
this.connectionHandler.resetBackoff();
watcherFuture.complete(this);
-
+ future.complete(null);
}).exceptionally((e) -> {
deregisterFromClientCnx();
if (getState() == State.Closing || getState() ==
State.Closed) {
// Watcher was closed while reconnecting, close
the connection to make sure the broker
// drops the watcher on its side
cnx.channel().close();
+ future.complete(null);
return null;
}
log.warn("[{}][{}] Failed to create topic list watcher
on {}",
@@ -155,7 +159,7 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
&&
PulsarClientException.isRetriableError(e.getCause())
&& System.currentTimeMillis()
<
CREATE_WATCHER_DEADLINE_UPDATER.get(TopicListWatcher.this)) {
- reconnectLater(e.getCause());
+ future.completeExceptionally(e.getCause());
} else if (!watcherFuture.isDone()) {
// unable to create new watcher, fail operation
setState(State.Failed);
@@ -164,11 +168,15 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
+ "when connecting to the
broker", getHandlerName())));
} else {
// watcher was subscribed and connected, but we
got some error, keep trying
- reconnectLater(e.getCause());
+ future.completeExceptionally(e.getCause());
+ }
+ if (!future.isDone()) {
+ future.complete(null);
}
return null;
});
}
+ return future;
}
@Override
@@ -249,11 +257,6 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
setClientCnx(null);
}
- void reconnectLater(Throwable exception) {
- this.connectionHandler.reconnectLater(exception);
- }
-
-
private void cleanupAtClose(CompletableFuture<Void> closeFuture, Throwable
exception) {
log.info("[{}] Closed topic list watcher", getHandlerName());
setState(State.Closed);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 601fa2b8f81..ebbfca0c3cb 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -123,14 +123,17 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
@Override
- public void connectionOpened(ClientCnx cnx) {
+ public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
LOG.info("Transaction meta handler with transaction coordinator id
{} connection opened.",
transactionCoordinatorId);
- if (getState() == State.Closing || getState() == State.Closed) {
+ State state = getState();
+ if (state == State.Closing || state == State.Closed) {
setState(State.Closed);
failPendingRequest();
+ future.complete(null);
return;
}
@@ -146,6 +149,7 @@ public class TransactionMetaStoreHandler extends
HandlerState
this.connectionHandler.resetBackoff();
pendingRequests.forEach((requestID, opBase) ->
checkStateAndSendRequest(opBase));
}
+ future.complete(null);
});
}).exceptionally((e) -> {
internalPinnedExecutor.execute(() -> {
@@ -155,16 +159,19 @@ public class TransactionMetaStoreHandler extends
HandlerState
|| e.getCause() instanceof
PulsarClientException.NotAllowedException) {
setState(State.Closed);
cnx.channel().close();
+ future.complete(null);
} else {
- connectionHandler.reconnectLater(e.getCause());
+ future.completeExceptionally(e.getCause());
}
});
return null;
});
} else {
registerToConnection(cnx);
+ future.complete(null);
}
});
+ return future;
}
private boolean registerToConnection(ClientCnx cnx) {