This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fff54d5593471bf38de915d9f72bae5e988226a7 Author: congbo <[email protected]> AuthorDate: Tue Nov 9 19:34:14 2021 +0800 [Transaction] Fix close pulsarClient then close transaction client connection (#12689) (cherry picked from commit 6162ccf34aa29e9495bf2f0cdc7e88e9e8c1d067) --- ...Test.java => TransactionClientConnectTest.java} | 29 +++++++++++++++++++++- .../broker/transaction/TransactionTestBase.java | 2 +- .../pulsar/client/impl/ConnectionHandler.java | 16 +++++++----- .../apache/pulsar/client/impl/HandlerState.java | 7 ++++++ .../pulsar/client/impl/PulsarClientImpl.java | 16 ++++++------ .../client/impl/TransactionMetaStoreHandler.java | 7 ++++++ 6 files changed, 61 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java similarity index 89% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java index 1f5ab15..42eadfe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java @@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.TransactionMetaStoreHandler; import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; @@ -39,6 +40,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -47,7 +49,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.FileAssert.fail; -public class TransactionClientReconnectTest extends TransactionTestBase { +public class TransactionClientConnectTest extends TransactionTestBase { private static final String RECONNECT_TOPIC = "persistent://public/txn/txn-client-reconnect-test"; private static final int NUM_PARTITIONS = 1; @@ -223,6 +225,31 @@ public class TransactionClientReconnectTest extends TransactionTestBase { reconnect(); } + @Test + public void testPulsarClientCloseThenCloseTcClient() throws Exception { + TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient(); + Field field = TransactionCoordinatorClientImpl.class.getDeclaredField("handlers"); + field.setAccessible(true); + TransactionMetaStoreHandler[] handlers = + (TransactionMetaStoreHandler[]) field.get(transactionCoordinatorClient); + + for (TransactionMetaStoreHandler handler : handlers) { + handler.newTransactionAsync(10, TimeUnit.SECONDS).get(); + } + pulsarClient.close(); + for (TransactionMetaStoreHandler handler : handlers) { + Method method = TransactionMetaStoreHandler.class.getMethod("getConnectHandleState"); + method.setAccessible(true); + assertEquals(method.invoke(handler).toString(), "Closed"); + try { + handler.newTransactionAsync(10, TimeUnit.SECONDS).get(); + } catch (ExecutionException | InterruptedException e) { + assertTrue(e.getCause() + instanceof TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException); + } + } + } + public void start() throws Exception { // wait transaction coordinator init success Awaitility.await().until(() -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index 622421b..1dba73a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -246,7 +246,7 @@ public abstract class TransactionTestBase extends TestRetrySupport { admin = null; } if (pulsarClient != null) { - pulsarClient.shutdown(); + pulsarClient.close(); pulsarClient = null; } if (pulsarServiceList.size() > 0) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java index 8fb7ab4..9babd2a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java @@ -103,12 +103,16 @@ public class ConnectionHandler { long delayMs = backoff.next(); log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try again in {} s", state.topic, state.getHandlerName(), exception.getMessage(), delayMs / 1000.0); - state.setState(State.Connecting); - state.client.timer().newTimeout(timeout -> { - log.info("[{}] [{}] Reconnecting after connection was closed", state.topic, state.getHandlerName()); - incrementEpoch(); - grabCnx(); - }, delayMs, TimeUnit.MILLISECONDS); + if (state.changeToConnecting()) { + state.client.timer().newTimeout(timeout -> { + log.info("[{}] [{}] Reconnecting after connection was closed", state.topic, state.getHandlerName()); + incrementEpoch(); + grabCnx(); + }, delayMs, TimeUnit.MILLISECONDS); + } else { + log.info("[{}] [{}] Ignoring reconnection request (state: {})", + state.topic, state.getHandlerName(), state.getState()); + } } protected long incrementEpoch() { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java index e72c97f..582df8c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java @@ -64,6 +64,13 @@ abstract class HandlerState { return STATE_UPDATER.get(this); } + protected boolean changeToConnecting() { + return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, State.Connecting) + || STATE_UPDATER.compareAndSet(this, State.Ready, State.Connecting) + || STATE_UPDATER.compareAndSet(this, State.RegisteringSchema, State.Connecting) + || STATE_UPDATER.compareAndSet(this, State.Connecting, State.Connecting)); + } + protected void setState(State s) { STATE_UPDATER.set(this, s); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 1234b8b..7703afc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -712,6 +712,14 @@ public class PulsarClientImpl implements PulsarClient { throwable = t; } } + if (tcClient != null) { + try { + tcClient.close(); + } catch (Throwable t) { + log.warn("Failed to close tcClient"); + throwable = t; + } + } try { // Shutting down eventLoopGroup separately because in some cases, cnxPool might be using different // eventLoopGroup. @@ -747,14 +755,6 @@ public class PulsarClientImpl implements PulsarClient { throwable = t; } } - if (tcClient != null) { - try { - tcClient.close(); - } catch (Throwable t) { - log.warn("Failed to close tcClient"); - throwable = t; - } - } if (throwable != null) { throw throwable; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index f96cf57..ba6ee50 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; @@ -540,6 +541,12 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect @Override public void close() throws IOException { this.requestTimeout.cancel(); + this.setState(State.Closed); + } + + @VisibleForTesting + public State getConnectHandleState() { + return getState(); } @Override
