This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new d5631c961c1 [fix][client] Make the whole grabCnx() progress atomic
(#20595)
d5631c961c1 is described below
commit d5631c961c1a026d7deef2c5405b1456f0dbfd77
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Jun 30 16:54:17 2023 +0800
[fix][client] Make the whole grabCnx() progress atomic (#20595)
(cherry picked from commit 2bede012c73c301b73c079aaeb122cbb472728c1)
---
.../pulsar/client/impl/ConnectionHandlerTest.java | 154 +++++++++++++++++++++
.../pulsar/client/impl/ConnectionHandler.java | 49 ++++---
.../apache/pulsar/client/impl/ConsumerImpl.java | 26 ++--
.../apache/pulsar/client/impl/ProducerImpl.java | 29 ++--
.../client/impl/TransactionMetaStoreHandler.java | 13 +-
5 files changed, 231 insertions(+), 40 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
new file mode 100644
index 00000000000..d93d87c9137
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class ConnectionHandlerTest extends ProducerConsumerBase {
+
+ private static final Backoff BACKOFF = new
BackoffBuilder().setInitialTime(1, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(1, TimeUnit.SECONDS)
+ .setMax(3, TimeUnit.SECONDS).create();
+ private final ExecutorService executor = Executors.newFixedThreadPool(4);
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ executor.shutdown();
+ }
+
+ @Test(timeOut = 30000)
+ public void testSynchronousGrabCnx() {
+ for (int i = 0; i < 10; i++) {
+ final CompletableFuture<Integer> future = new
CompletableFuture<>();
+ final int index = i;
+ final ConnectionHandler handler = new ConnectionHandler(
+ new MockedHandlerState((PulsarClientImpl) pulsarClient,
"my-topic"), BACKOFF,
+ cnx -> {
+ future.complete(index);
+ return CompletableFuture.completedFuture(null);
+ });
+ handler.grabCnx();
+ Assert.assertEquals(future.join().intValue(), i);
+ }
+ }
+
+ @Test
+ public void testConcurrentGrabCnx() {
+ final AtomicInteger cnt = new AtomicInteger(0);
+ final ConnectionHandler handler = new ConnectionHandler(
+ new MockedHandlerState((PulsarClientImpl) pulsarClient,
"my-topic"), BACKOFF,
+ cnx -> {
+ cnt.incrementAndGet();
+ return CompletableFuture.completedFuture(null);
+ });
+ final int numGrab = 10;
+ for (int i = 0; i < numGrab; i++) {
+ handler.grabCnx();
+ }
+ Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> cnt.get()
> 0);
+ Assert.assertThrows(ConditionTimeoutException.class,
+ () ->
Awaitility.await().atMost(Duration.ofMillis(500)).until(() -> cnt.get() ==
numGrab));
+ Assert.assertEquals(cnt.get(), 1);
+ }
+
+ @Test
+ public void testDuringConnectInvokeCount() throws IllegalAccessException {
+ // 1. connectionOpened completes with null
+ final AtomicBoolean duringConnect = spy(new AtomicBoolean());
+ final ConnectionHandler handler1 = new ConnectionHandler(
+ new MockedHandlerState((PulsarClientImpl) pulsarClient,
"my-topic"), BACKOFF,
+ cnx -> CompletableFuture.completedFuture(null));
+ FieldUtils.writeField(handler1, "duringConnect", duringConnect, true);
+ handler1.grabCnx();
+ Awaitility.await().atMost(Duration.ofSeconds(3)).until(() ->
!duringConnect.get());
+ verify(duringConnect, times(1)).compareAndSet(false, true);
+ verify(duringConnect, times(1)).set(false);
+
+ // 2. connectionFailed is called
+ final ConnectionHandler handler2 = new ConnectionHandler(
+ new MockedHandlerState((PulsarClientImpl) pulsarClient, null),
new MockedBackoff(),
+ cnx -> CompletableFuture.completedFuture(null));
+ FieldUtils.writeField(handler2, "duringConnect", duringConnect, true);
+ handler2.grabCnx();
+ Awaitility.await().atMost(Duration.ofSeconds(3)).until(() ->
!duringConnect.get());
+ verify(duringConnect, times(2)).compareAndSet(false, true);
+ verify(duringConnect, times(2)).set(false);
+
+ // 3. connectionOpened completes exceptionally
+ final ConnectionHandler handler3 = new ConnectionHandler(
+ new MockedHandlerState((PulsarClientImpl) pulsarClient,
"my-topic"), new MockedBackoff(),
+ cnx -> FutureUtil.failedFuture(new RuntimeException("fail")));
+ FieldUtils.writeField(handler3, "duringConnect", duringConnect, true);
+ handler3.grabCnx();
+ Awaitility.await().atMost(Duration.ofSeconds(3)).until(() ->
!duringConnect.get());
+ verify(duringConnect, times(3)).compareAndSet(false, true);
+ verify(duringConnect, times(3)).set(false);
+ }
+
+ private static class MockedHandlerState extends HandlerState {
+
+ public MockedHandlerState(PulsarClientImpl client, String topic) {
+ super(client, topic);
+ }
+
+ @Override
+ String getHandlerName() {
+ return "mocked";
+ }
+ }
+
+ private static class MockedBackoff extends Backoff {
+
+ // Set a large backoff so that reconnection won't happen in tests
+ public MockedBackoff() {
+ super(1, TimeUnit.HOURS, 2, TimeUnit.HOURS, 1, TimeUnit.HOURS);
+ }
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index a951d7b2cb8..1d9c28d90ea 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.client.impl;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -39,10 +41,16 @@ public class ConnectionHandler {
// Start with -1L because it gets incremented before sending on the first
connection
private volatile long epoch = -1L;
protected volatile long lastConnectionClosedTimestamp = 0L;
+ private final AtomicBoolean duringConnect = new AtomicBoolean(false);
interface Connection {
- void connectionFailed(PulsarClientException exception);
- void connectionOpened(ClientCnx cnx);
+
+ /**
+ * @apiNote If the returned future is completed exceptionally,
reconnectLater will be called.
+ */
+ CompletableFuture<Void> connectionOpened(ClientCnx cnx);
+ default void connectionFailed(PulsarClientException e) {
+ }
}
protected Connection connection;
@@ -67,10 +75,16 @@ public class ConnectionHandler {
state.topic, state.getHandlerName(), state.getState());
return;
}
+ if (!duringConnect.compareAndSet(false, true)) {
+ log.info("[{}] [{}] Skip grabbing the connection since there is a
pending connection",
+ state.topic, state.getHandlerName());
+ return;
+ }
try {
state.client.getConnection(state.topic) //
.thenAccept(cnx -> connection.connectionOpened(cnx)) //
+ .thenAccept(__ -> duringConnect.set(false))
.exceptionally(this::handleConnectionError);
} catch (Throwable t) {
log.warn("[{}] [{}] Exception thrown while getting connection: ",
state.topic, state.getHandlerName(), t);
@@ -79,25 +93,27 @@ public class ConnectionHandler {
}
private Void handleConnectionError(Throwable exception) {
- log.warn("[{}] [{}] Error connecting to broker: {}",
- state.topic, state.getHandlerName(), exception.getMessage());
- if (exception instanceof PulsarClientException) {
- connection.connectionFailed((PulsarClientException) exception);
- } else if (exception.getCause() instanceof PulsarClientException) {
- connection.connectionFailed((PulsarClientException)
exception.getCause());
- } else {
- connection.connectionFailed(new PulsarClientException(exception));
- }
-
- State state = this.state.getState();
- if (state == State.Uninitialized || state == State.Connecting || state
== State.Ready) {
- reconnectLater(exception);
+ try {
+ log.warn("[{}] [{}] Error connecting to broker: {}",
+ state.topic, state.getHandlerName(),
exception.getMessage());
+ if (exception instanceof PulsarClientException) {
+ connection.connectionFailed((PulsarClientException) exception);
+ } else if (exception.getCause() instanceof PulsarClientException) {
+ connection.connectionFailed((PulsarClientException)
exception.getCause());
+ } else {
+ connection.connectionFailed(new
PulsarClientException(exception));
+ }
+ } catch (Throwable throwable) {
+ log.error("[{}] [{}] Unexpected exception after the connection",
+ state.topic, state.getHandlerName(), throwable);
}
+ reconnectLater(exception);
return null;
}
- protected void reconnectLater(Throwable exception) {
+ void reconnectLater(Throwable exception) {
+ duringConnect.set(false);
CLIENT_CNX_UPDATER.set(this, null);
if (!isValidStateForReconnection()) {
log.info("[{}] [{}] Ignoring reconnection request (state: {})",
@@ -121,6 +137,7 @@ public class ConnectionHandler {
public void connectionClosed(ClientCnx cnx) {
lastConnectionClosedTimestamp = System.currentTimeMillis();
+ duringConnect.set(false);
state.client.getCnxPool().releaseConnection(cnx);
if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
if (!isValidStateForReconnection()) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 453fd528432..4a7fef11af4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -765,16 +765,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
@Override
- public void connectionOpened(final ClientCnx cnx) {
+ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
previousExceptions.clear();
- if (getState() == State.Closing || getState() == State.Closed) {
+ final State state = getState();
+ if (state == State.Closing || state == State.Closed) {
setState(State.Closed);
closeConsumerTasks();
deregisterFromClientCnx();
client.cleanupConsumer(this);
clearReceiverQueue();
- return;
+ return CompletableFuture.completedFuture(null);
}
log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}",
@@ -823,6 +824,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
&& startMessageId.equals(initialStartMessageId)) ?
startMessageRollbackDurationInSec : 0;
// synchronized this, because redeliverUnAckMessage eliminate the
epoch inconsistency between them
+ final CompletableFuture<Void> future = new CompletableFuture<>();
synchronized (this) {
setClientCnx(cnx);
ByteBuf request = Commands.newSubscribe(topic, subscription,
consumerId, requestId, getSubType(),
@@ -844,6 +846,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
deregisterFromClientCnx();
client.cleanupConsumer(this);
cnx.channel().close();
+ future.complete(null);
return;
}
}
@@ -856,12 +859,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (!(firstTimeConnect && hasParentConsumer) &&
conf.getReceiverQueueSize() != 0) {
increaseAvailablePermits(cnx, conf.getReceiverQueueSize());
}
+ future.complete(null);
}).exceptionally((e) -> {
deregisterFromClientCnx();
if (getState() == State.Closing || getState() == State.Closed)
{
// Consumer was closed while reconnecting, close the
connection to make sure the broker
// drops the consumer on its side
cnx.channel().close();
+ future.complete(null);
return null;
}
log.warn("[{}][{}] Failed to subscribe to topic on {}", topic,
@@ -879,7 +884,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (e.getCause() instanceof PulsarClientException
&& PulsarClientException.isRetriableError(e.getCause())
&& System.currentTimeMillis() <
SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
- reconnectLater(e.getCause());
+ future.completeExceptionally(e.getCause());
} else if (!subscribeFuture.isDone()) {
// unable to create new consumer, fail operation
setState(State.Failed);
@@ -903,11 +908,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
topic, subscription,
cnx.channel().remoteAddress());
} else {
// consumer was subscribed and connected but we got some
error, keep trying
- reconnectLater(e.getCause());
+ future.completeExceptionally(e.getCause());
+ }
+
+ if (!future.isDone()) {
+ future.complete(null);
}
return null;
});
}
+ return future;
}
protected void consumerIsReconnectedToBroker(ClientCnx cnx, int
currentQueueSize) {
@@ -992,7 +1002,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
setState(State.Failed);
if (nonRetriableError) {
log.info("[{}] Consumer creation failed for consumer {}
with unretriableError {}",
- topic, consumerId, exception);
+ topic, consumerId, exception.getMessage());
} else {
log.info("[{}] Consumer creation failed for consumer {}
after timeout", topic, consumerId);
}
@@ -2580,10 +2590,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
setClientCnx(null);
}
- void reconnectLater(Throwable exception) {
- this.connectionHandler.reconnectLater(exception);
- }
-
void grabCnx() {
this.connectionHandler.grabCnx();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index f2334650ad8..a8ef3236f56 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1536,8 +1536,9 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
}
+
@Override
- public void connectionOpened(final ClientCnx cnx) {
+ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
previousExceptions.clear();
final long epoch;
@@ -1545,7 +1546,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
// Because the state could have been updated while retrieving the
connection, we set it back to connecting,
// as long as the change from current state to connecting is a
valid state change.
if (!changeToConnecting()) {
- return;
+ return CompletableFuture.completedFuture(null);
}
// We set the cnx reference before registering the producer on the
cnx, so if the cnx breaks before creating
// the producer, it will try to grab a new cnx. We also increment
and get the epoch value for the producer.
@@ -1585,6 +1586,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
}
+ final CompletableFuture<Void> future = new CompletableFuture<>();
cnx.sendRequestWithId(
Commands.newProducer(topic, producerId, requestId,
producerName, conf.isEncryptionEnabled(), metadata,
schemaInfo, epoch, userProvidedProducerName,
@@ -1599,11 +1601,13 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
// We are now reconnected to broker and clear to send
messages. Re-send all pending messages and
// set the cnx pointer so that new messages will be sent
immediately
synchronized (ProducerImpl.this) {
- if (getState() == State.Closing || getState() ==
State.Closed) {
+ State state = getState();
+ if (state == State.Closing || state == State.Closed) {
// Producer was closed while reconnecting, close
the connection to make sure the broker
// drops the producer on its side
cnx.removeProducer(producerId);
cnx.channel().close();
+ future.complete(null);
return;
}
resetBackoff();
@@ -1653,13 +1657,16 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
resendMessages(cnx, epoch);
}
+ future.complete(null);
}).exceptionally((e) -> {
Throwable cause = e.getCause();
cnx.removeProducer(producerId);
- if (getState() == State.Closing || getState() ==
State.Closed) {
+ State state = getState();
+ if (state == State.Closing || state == State.Closed) {
// Producer was closed while reconnecting, close the
connection to make sure the broker
// drops the producer on its side
cnx.channel().close();
+ future.complete(null);
return null;
}
@@ -1688,6 +1695,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
producerCreatedFuture.completeExceptionally(cause);
});
+ future.complete(null);
return null;
}
if (cause instanceof
PulsarClientException.ProducerBlockedQuotaExceededException) {
@@ -1731,7 +1739,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
&& System.currentTimeMillis() <
PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this))) {
// Either we had already created the producer once
(producerCreatedFuture.isDone()) or we are
// still within the initial timeout budget and we are
dealing with a retriable error
- reconnectLater(cause);
+ future.completeExceptionally(cause);
} else {
setState(State.Failed);
producerCreatedFuture.completeExceptionally(cause);
@@ -1743,9 +1751,12 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
sendTimeout = null;
}
}
-
+ if (!future.isDone()) {
+ future.complete(null);
+ }
return null;
});
+ return future;
}
@Override
@@ -1757,7 +1768,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
if (producerCreatedFuture.completeExceptionally(exception)) {
if (nonRetriableError) {
log.info("[{}] Producer creation failed for producer {}
with unretriableError = {}",
- topic, producerId, exception);
+ topic, producerId, exception.getMessage());
} else {
log.info("[{}] Producer creation failed for producer {}
after producerTimeout", topic, producerId);
}
@@ -2183,10 +2194,6 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
this.connectionHandler.setClientCnx(clientCnx);
}
- void reconnectLater(Throwable exception) {
- this.connectionHandler.reconnectLater(exception);
- }
-
void grabCnx() {
this.connectionHandler.grabCnx();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index baff5deb3f1..56443709bf3 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -123,14 +123,17 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
@Override
- public void connectionOpened(ClientCnx cnx) {
+ public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
LOG.info("Transaction meta handler with transaction coordinator id
{} connection opened.",
transactionCoordinatorId);
- if (getState() == State.Closing || getState() == State.Closed) {
+ State state = getState();
+ if (state == State.Closing || state == State.Closed) {
setState(State.Closed);
failPendingRequest();
+ future.complete(null);
return;
}
@@ -146,6 +149,7 @@ public class TransactionMetaStoreHandler extends
HandlerState
this.connectionHandler.resetBackoff();
pendingRequests.forEach((requestID, opBase) ->
checkStateAndSendRequest(opBase));
}
+ future.complete(null);
});
}).exceptionally((e) -> {
internalPinnedExecutor.execute(() -> {
@@ -155,16 +159,19 @@ public class TransactionMetaStoreHandler extends
HandlerState
|| e.getCause() instanceof
PulsarClientException.NotAllowedException) {
setState(State.Closed);
cnx.channel().close();
+ future.complete(null);
} else {
- connectionHandler.reconnectLater(e.getCause());
+ future.completeExceptionally(e.getCause());
}
});
return null;
});
} else {
registerToConnection(cnx);
+ future.complete(null);
}
});
+ return future;
}
private boolean registerToConnection(ClientCnx cnx) {