This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new daa3066265c [fix][client] Make the whole grabCnx() progress atomic
(#20595)
daa3066265c is described below
commit daa3066265c8cc02efdd07dedffbc3878541bd79
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Jun 30 16:54:17 2023 +0800
[fix][client] Make the whole grabCnx() progress atomic (#20595)
---
.../pulsar/client/impl/ConnectionHandlerTest.java | 154 +++++++++++++++++++++
.../pulsar/client/impl/ConnectionHandler.java | 50 ++++---
.../apache/pulsar/client/impl/ConsumerImpl.java | 26 ++--
.../apache/pulsar/client/impl/ProducerImpl.java | 29 ++--
.../pulsar/client/impl/TopicListWatcher.java | 27 ++--
.../client/impl/TransactionMetaStoreHandler.java | 13 +-
6 files changed, 246 insertions(+), 53 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
new file mode 100644
index 00000000000..f29d62db5f4
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class ConnectionHandlerTest extends ProducerConsumerBase {
+
+ private static final Backoff BACKOFF = new
BackoffBuilder().setInitialTime(1, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(1, TimeUnit.SECONDS)
+ .setMax(3, TimeUnit.SECONDS).create();
+ private final ExecutorService executor = Executors.newFixedThreadPool(4);
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ executor.shutdown();
+ }
+
+ @Test(timeOut = 30000)
+ public void testSynchronousGrabCnx() {
+ for (int i = 0; i < 10; i++) {
+ final CompletableFuture<Integer> future = new
CompletableFuture<>();
+ final int index = i;
+ final ConnectionHandler handler = new ConnectionHandler(
+ new MockedHandlerState((PulsarClientImpl) pulsarClient,
"my-topic"), BACKOFF,
+ cnx -> {
+ future.complete(index);
+ return CompletableFuture.completedFuture(null);
+ });
+ handler.grabCnx();
+ Assert.assertEquals(future.join(), i);
+ }
+ }
+
+ @Test
+ public void testConcurrentGrabCnx() {
+ final AtomicInteger cnt = new AtomicInteger(0);
+ final ConnectionHandler handler = new ConnectionHandler(
+ new MockedHandlerState((PulsarClientImpl) pulsarClient,
"my-topic"), BACKOFF,
+ cnx -> {
+ cnt.incrementAndGet();
+ return CompletableFuture.completedFuture(null);
+ });
+ final int numGrab = 10;
+ for (int i = 0; i < numGrab; i++) {
+ handler.grabCnx();
+ }
+ Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> cnt.get()
> 0);
+ Assert.assertThrows(ConditionTimeoutException.class,
+ () ->
Awaitility.await().atMost(Duration.ofMillis(500)).until(() -> cnt.get() ==
numGrab));
+ Assert.assertEquals(cnt.get(), 1);
+ }
+
+ @Test
+ public void testDuringConnectInvokeCount() throws IllegalAccessException {
+ // 1. connectionOpened completes with null
+ final AtomicBoolean duringConnect = spy(new AtomicBoolean());
+ final ConnectionHandler handler1 = new ConnectionHandler(
+ new MockedHandlerState((PulsarClientImpl) pulsarClient,
"my-topic"), BACKOFF,
+ cnx -> CompletableFuture.completedFuture(null));
+ FieldUtils.writeField(handler1, "duringConnect", duringConnect, true);
+ handler1.grabCnx();
+ Awaitility.await().atMost(Duration.ofSeconds(3)).until(() ->
!duringConnect.get());
+ verify(duringConnect, times(1)).compareAndSet(false, true);
+ verify(duringConnect, times(1)).set(false);
+
+ // 2. connectionFailed is called
+ final ConnectionHandler handler2 = new ConnectionHandler(
+ new MockedHandlerState((PulsarClientImpl) pulsarClient, null),
new MockedBackoff(),
+ cnx -> CompletableFuture.completedFuture(null));
+ FieldUtils.writeField(handler2, "duringConnect", duringConnect, true);
+ handler2.grabCnx();
+ Awaitility.await().atMost(Duration.ofSeconds(3)).until(() ->
!duringConnect.get());
+ verify(duringConnect, times(2)).compareAndSet(false, true);
+ verify(duringConnect, times(2)).set(false);
+
+ // 3. connectionOpened completes exceptionally
+ final ConnectionHandler handler3 = new ConnectionHandler(
+ new MockedHandlerState((PulsarClientImpl) pulsarClient,
"my-topic"), new MockedBackoff(),
+ cnx -> FutureUtil.failedFuture(new RuntimeException("fail")));
+ FieldUtils.writeField(handler3, "duringConnect", duringConnect, true);
+ handler3.grabCnx();
+ Awaitility.await().atMost(Duration.ofSeconds(3)).until(() ->
!duringConnect.get());
+ verify(duringConnect, times(3)).compareAndSet(false, true);
+ verify(duringConnect, times(3)).set(false);
+ }
+
+ private static class MockedHandlerState extends HandlerState {
+
+ public MockedHandlerState(PulsarClientImpl client, String topic) {
+ super(client, topic);
+ }
+
+ @Override
+ String getHandlerName() {
+ return "mocked";
+ }
+ }
+
+ private static class MockedBackoff extends Backoff {
+
+ // Set a large backoff so that reconnection won't happen in tests
+ public MockedBackoff() {
+ super(1, TimeUnit.HOURS, 2, TimeUnit.HOURS, 1, TimeUnit.HOURS);
+ }
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 6c5a5be200f..ea1e09670e9 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -40,10 +41,16 @@ public class ConnectionHandler {
// Start with -1L because it gets incremented before sending on the first
connection
private volatile long epoch = -1L;
protected volatile long lastConnectionClosedTimestamp = 0L;
+ private final AtomicBoolean duringConnect = new AtomicBoolean(false);
interface Connection {
- void connectionFailed(PulsarClientException exception);
- void connectionOpened(ClientCnx cnx);
+
+ /**
+ * @apiNote If the returned future is completed exceptionally,
reconnectLater will be called.
+ */
+ CompletableFuture<Void> connectionOpened(ClientCnx cnx);
+ default void connectionFailed(PulsarClientException e) {
+ }
}
protected Connection connection;
@@ -68,6 +75,11 @@ public class ConnectionHandler {
state.topic, state.getHandlerName(), state.getState());
return;
}
+ if (!duringConnect.compareAndSet(false, true)) {
+ log.info("[{}] [{}] Skip grabbing the connection since there is a
pending connection",
+ state.topic, state.getHandlerName());
+ return;
+ }
try {
CompletableFuture<ClientCnx> cnxFuture;
@@ -76,7 +88,8 @@ public class ConnectionHandler {
} else {
cnxFuture = state.client.getConnection(state.topic); //
}
- cnxFuture.thenAccept(cnx -> connection.connectionOpened(cnx)) //
+ cnxFuture.thenCompose(cnx -> connection.connectionOpened(cnx))
+ .thenAccept(__ -> duringConnect.set(false))
.exceptionally(this::handleConnectionError);
} catch (Throwable t) {
log.warn("[{}] [{}] Exception thrown while getting connection: ",
state.topic, state.getHandlerName(), t);
@@ -85,25 +98,27 @@ public class ConnectionHandler {
}
private Void handleConnectionError(Throwable exception) {
- log.warn("[{}] [{}] Error connecting to broker: {}",
- state.topic, state.getHandlerName(), exception.getMessage());
- if (exception instanceof PulsarClientException) {
- connection.connectionFailed((PulsarClientException) exception);
- } else if (exception.getCause() instanceof PulsarClientException) {
- connection.connectionFailed((PulsarClientException)
exception.getCause());
- } else {
- connection.connectionFailed(new PulsarClientException(exception));
- }
-
- State state = this.state.getState();
- if (state == State.Uninitialized || state == State.Connecting || state
== State.Ready) {
- reconnectLater(exception);
+ try {
+ log.warn("[{}] [{}] Error connecting to broker: {}",
+ state.topic, state.getHandlerName(),
exception.getMessage());
+ if (exception instanceof PulsarClientException) {
+ connection.connectionFailed((PulsarClientException) exception);
+ } else if (exception.getCause() instanceof PulsarClientException) {
+ connection.connectionFailed((PulsarClientException)
exception.getCause());
+ } else {
+ connection.connectionFailed(new
PulsarClientException(exception));
+ }
+ } catch (Throwable throwable) {
+ log.error("[{}] [{}] Unexpected exception after the connection",
+ state.topic, state.getHandlerName(), throwable);
}
+ reconnectLater(exception);
return null;
}
- protected void reconnectLater(Throwable exception) {
+ void reconnectLater(Throwable exception) {
+ duringConnect.set(false);
CLIENT_CNX_UPDATER.set(this, null);
if (!isValidStateForReconnection()) {
log.info("[{}] [{}] Ignoring reconnection request (state: {})",
@@ -127,6 +142,7 @@ public class ConnectionHandler {
public void connectionClosed(ClientCnx cnx) {
lastConnectionClosedTimestamp = System.currentTimeMillis();
+ duringConnect.set(false);
state.client.getCnxPool().releaseConnection(cnx);
if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
if (!isValidStateForReconnection()) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 101d42a6694..f845407435e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -801,16 +801,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
@Override
- public void connectionOpened(final ClientCnx cnx) {
+ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
previousExceptions.clear();
- if (getState() == State.Closing || getState() == State.Closed) {
+ final State state = getState();
+ if (state == State.Closing || state == State.Closed) {
setState(State.Closed);
closeConsumerTasks();
deregisterFromClientCnx();
client.cleanupConsumer(this);
clearReceiverQueue();
- return;
+ return CompletableFuture.completedFuture(null);
}
log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}",
@@ -859,6 +860,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
&& startMessageId.equals(initialStartMessageId)) ?
startMessageRollbackDurationInSec : 0;
// synchronized this, because redeliverUnAckMessage eliminate the
epoch inconsistency between them
+ final CompletableFuture<Void> future = new CompletableFuture<>();
synchronized (this) {
setClientCnx(cnx);
ByteBuf request = Commands.newSubscribe(topic, subscription,
consumerId, requestId, getSubType(),
@@ -880,6 +882,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
deregisterFromClientCnx();
client.cleanupConsumer(this);
cnx.channel().close();
+ future.complete(null);
return;
}
}
@@ -892,12 +895,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (!(firstTimeConnect && hasParentConsumer) &&
getCurrentReceiverQueueSize() != 0) {
increaseAvailablePermits(cnx,
getCurrentReceiverQueueSize());
}
+ future.complete(null);
}).exceptionally((e) -> {
deregisterFromClientCnx();
if (getState() == State.Closing || getState() == State.Closed)
{
// Consumer was closed while reconnecting, close the
connection to make sure the broker
// drops the consumer on its side
cnx.channel().close();
+ future.complete(null);
return null;
}
log.warn("[{}][{}] Failed to subscribe to topic on {}", topic,
@@ -915,7 +920,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (e.getCause() instanceof PulsarClientException
&& PulsarClientException.isRetriableError(e.getCause())
&& System.currentTimeMillis() <
SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
- reconnectLater(e.getCause());
+ future.completeExceptionally(e.getCause());
} else if (!subscribeFuture.isDone()) {
// unable to create new consumer, fail operation
setState(State.Failed);
@@ -939,11 +944,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
topic, subscription,
cnx.channel().remoteAddress());
} else {
// consumer was subscribed and connected but we got some
error, keep trying
- reconnectLater(e.getCause());
+ future.completeExceptionally(e.getCause());
+ }
+
+ if (!future.isDone()) {
+ future.complete(null);
}
return null;
});
}
+ return future;
}
protected void consumerIsReconnectedToBroker(ClientCnx cnx, int
currentQueueSize) {
@@ -1028,7 +1038,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
setState(State.Failed);
if (nonRetriableError) {
log.info("[{}] Consumer creation failed for consumer {}
with unretriableError {}",
- topic, consumerId, exception);
+ topic, consumerId, exception.getMessage());
} else {
log.info("[{}] Consumer creation failed for consumer {}
after timeout", topic, consumerId);
}
@@ -2639,10 +2649,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
setClientCnx(null);
}
- void reconnectLater(Throwable exception) {
- this.connectionHandler.reconnectLater(exception);
- }
-
void grabCnx() {
this.connectionHandler.grabCnx();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index c382153f4ae..869aad0fc01 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1620,8 +1620,9 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
}
+
@Override
- public void connectionOpened(final ClientCnx cnx) {
+ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
previousExceptions.clear();
chunkMaxMessageSize = Math.min(chunkMaxMessageSize,
ClientCnx.getMaxMessageSize());
@@ -1630,7 +1631,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
// Because the state could have been updated while retrieving the
connection, we set it back to connecting,
// as long as the change from current state to connecting is a
valid state change.
if (!changeToConnecting()) {
- return;
+ return CompletableFuture.completedFuture(null);
}
// We set the cnx reference before registering the producer on the
cnx, so if the cnx breaks before creating
// the producer, it will try to grab a new cnx. We also increment
and get the epoch value for the producer.
@@ -1670,6 +1671,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
}
+ final CompletableFuture<Void> future = new CompletableFuture<>();
cnx.sendRequestWithId(
Commands.newProducer(topic, producerId, requestId,
producerName, conf.isEncryptionEnabled(), metadata,
schemaInfo, epoch, userProvidedProducerName,
@@ -1684,11 +1686,13 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
// We are now reconnected to broker and clear to send
messages. Re-send all pending messages and
// set the cnx pointer so that new messages will be sent
immediately
synchronized (ProducerImpl.this) {
- if (getState() == State.Closing || getState() ==
State.Closed) {
+ State state = getState();
+ if (state == State.Closing || state == State.Closed) {
// Producer was closed while reconnecting, close
the connection to make sure the broker
// drops the producer on its side
cnx.removeProducer(producerId);
cnx.channel().close();
+ future.complete(null);
return;
}
resetBackoff();
@@ -1715,13 +1719,16 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
resendMessages(cnx, epoch);
}
+ future.complete(null);
}).exceptionally((e) -> {
Throwable cause = e.getCause();
cnx.removeProducer(producerId);
- if (getState() == State.Closing || getState() ==
State.Closed) {
+ State state = getState();
+ if (state == State.Closing || state == State.Closed) {
// Producer was closed while reconnecting, close the
connection to make sure the broker
// drops the producer on its side
cnx.channel().close();
+ future.complete(null);
return null;
}
@@ -1750,6 +1757,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
producerCreatedFuture.completeExceptionally(cause);
});
+ future.complete(null);
return null;
}
if (cause instanceof
PulsarClientException.ProducerBlockedQuotaExceededException) {
@@ -1793,7 +1801,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
&& System.currentTimeMillis() <
PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this))) {
// Either we had already created the producer once
(producerCreatedFuture.isDone()) or we are
// still within the initial timeout budget and we are
dealing with a retriable error
- reconnectLater(cause);
+ future.completeExceptionally(cause);
} else {
setState(State.Failed);
producerCreatedFuture.completeExceptionally(cause);
@@ -1805,9 +1813,12 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
sendTimeout = null;
}
}
-
+ if (!future.isDone()) {
+ future.complete(null);
+ }
return null;
});
+ return future;
}
@Override
@@ -1819,7 +1830,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
if (producerCreatedFuture.completeExceptionally(exception)) {
if (nonRetriableError) {
log.info("[{}] Producer creation failed for producer {}
with unretriableError = {}",
- topic, producerId, exception);
+ topic, producerId, exception.getMessage());
} else {
log.info("[{}] Producer creation failed for producer {}
after producerTimeout", topic, producerId);
}
@@ -2334,10 +2345,6 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
this.connectionHandler.setClientCnx(clientCnx);
}
- void reconnectLater(Throwable exception) {
- this.connectionHandler.reconnectLater(exception);
- }
-
void grabCnx() {
this.connectionHandler.grabCnx();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index d2ca018aef9..5ee47c929e4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -89,7 +89,7 @@ public class TopicListWatcher extends HandlerState implements
ConnectionHandler.
if (watcherFuture.completeExceptionally(exception)) {
setState(State.Failed);
log.info("[{}] Watcher creation failed for {} with
non-retriable error {}",
- topic, name, exception);
+ topic, name, exception.getMessage());
deregisterFromClientCnx();
}
} else {
@@ -98,13 +98,14 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
}
@Override
- public void connectionOpened(ClientCnx cnx) {
+ public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
previousExceptions.clear();
- if (getState() == State.Closing || getState() == State.Closed) {
+ State state = getState();
+ if (state == State.Closing || state == State.Closed) {
setState(State.Closed);
deregisterFromClientCnx();
- return;
+ return CompletableFuture.completedFuture(null);
}
log.info("[{}][{}] Creating topic list watcher on cnx {}, watcherId
{}",
@@ -116,6 +117,7 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
.compareAndSet(this, 0L, System.currentTimeMillis()
+ client.getConfiguration().getOperationTimeoutMs());
+ final CompletableFuture<Void> future = new CompletableFuture<>();
// synchronized this, because redeliverUnAckMessage eliminate the
epoch inconsistency between them
synchronized (this) {
setClientCnx(cnx);
@@ -132,6 +134,7 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
setState(State.Closed);
deregisterFromClientCnx();
cnx.channel().close();
+ future.complete(null);
return;
}
}
@@ -139,13 +142,14 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
this.connectionHandler.resetBackoff();
watcherFuture.complete(this);
-
+ future.complete(null);
}).exceptionally((e) -> {
deregisterFromClientCnx();
if (getState() == State.Closing || getState() ==
State.Closed) {
// Watcher was closed while reconnecting, close
the connection to make sure the broker
// drops the watcher on its side
cnx.channel().close();
+ future.complete(null);
return null;
}
log.warn("[{}][{}] Failed to subscribe to topic on
{}", topic,
@@ -155,7 +159,7 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
&&
PulsarClientException.isRetriableError(e.getCause())
&& System.currentTimeMillis()
<
CREATE_WATCHER_DEADLINE_UPDATER.get(TopicListWatcher.this)) {
- reconnectLater(e.getCause());
+ future.completeExceptionally(e.getCause());
} else if (!watcherFuture.isDone()) {
// unable to create new watcher, fail operation
setState(State.Failed);
@@ -164,11 +168,15 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
+ "when connecting to the
broker", getHandlerName())));
} else {
// watcher was subscribed and connected, but we
got some error, keep trying
- reconnectLater(e.getCause());
+ future.completeExceptionally(e.getCause());
+ }
+ if (!future.isDone()) {
+ future.complete(null);
}
return null;
});
}
+ return future;
}
@Override
@@ -249,11 +257,6 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
setClientCnx(null);
}
- void reconnectLater(Throwable exception) {
- this.connectionHandler.reconnectLater(exception);
- }
-
-
private void cleanupAtClose(CompletableFuture<Void> closeFuture, Throwable
exception) {
log.info("[{}] Closed topic list watcher", getHandlerName());
setState(State.Closed);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 15533cfd2a9..3d44b789135 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -123,14 +123,17 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
@Override
- public void connectionOpened(ClientCnx cnx) {
+ public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
LOG.info("Transaction meta handler with transaction coordinator id
{} connection opened.",
transactionCoordinatorId);
- if (getState() == State.Closing || getState() == State.Closed) {
+ State state = getState();
+ if (state == State.Closing || state == State.Closed) {
setState(State.Closed);
failPendingRequest();
+ future.complete(null);
return;
}
@@ -146,6 +149,7 @@ public class TransactionMetaStoreHandler extends
HandlerState
this.connectionHandler.resetBackoff();
pendingRequests.forEach((requestID, opBase) ->
checkStateAndSendRequest(opBase));
}
+ future.complete(null);
});
}).exceptionally((e) -> {
internalPinnedExecutor.execute(() -> {
@@ -155,16 +159,19 @@ public class TransactionMetaStoreHandler extends
HandlerState
|| e.getCause() instanceof
PulsarClientException.NotAllowedException) {
setState(State.Closed);
cnx.channel().close();
+ future.complete(null);
} else {
- connectionHandler.reconnectLater(e.getCause());
+ future.completeExceptionally(e.getCause());
}
});
return null;
});
} else {
registerToConnection(cnx);
+ future.complete(null);
}
});
+ return future;
}
private boolean registerToConnection(ClientCnx cnx) {