This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bff7916441e9ed89d20f25664088290cf856cf5a Author: Xiangying Meng <[email protected]> AuthorDate: Tue Dec 14 21:15:57 2021 +0800 [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect (#13135) ### Motivation and Modification We should not throw the following exceptions to the user to deal with. 1. `TransactionCoordinatorNotFound` or `ManagerLedgerFenceException` --- we should retry the operation and reconnect to TC 2. `TransactionMetaStoreHandler` was connecting ---- add the operation into `pendingRequests`, and executed the requests in `pendingRequests` when the connected completely. 3. The complexity of concurrent operations is too high. For operations in a TransactionMetaStoreHandler, consider using single-threaded operations --- use `internalPinnedExecutor` (cherry picked from commit 56323e4a5b70c3008706515acd871ba0571ec1eb) --- .../broker/TransactionMetadataStoreService.java | 8 +- .../apache/pulsar/broker/service/ServerCnx.java | 68 ++- .../impl}/TransactionClientConnectTest.java | 218 +++----- .../client/impl/TransactionMetaStoreHandler.java | 577 ++++++++++++++------- 4 files changed, 504 insertions(+), 367 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 240c6c9..3f3e8d8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -356,7 +356,7 @@ public class TransactionMetadataStoreService { endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS); } - completableFuture.completeExceptionally(e); + completableFuture.completeExceptionally(e.getCause()); return null; })).exceptionally(e -> { if (!isRetryableException(e.getCause())) { @@ -371,7 +371,7 @@ public class TransactionMetadataStoreService { endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS); } - completableFuture.completeExceptionally(e); + completableFuture.completeExceptionally(e.getCause()); return null; }); } else { @@ -391,7 +391,7 @@ public class TransactionMetadataStoreService { LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, " + "TxnAction : {}", txnID, txnAction, e); } - completableFuture.completeExceptionally(e); + completableFuture.completeExceptionally(e.getCause()); return null; }); } else { @@ -409,7 +409,7 @@ public class TransactionMetadataStoreService { transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction, isTimeout), endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS); } - completableFuture.completeExceptionally(e); + completableFuture.completeExceptionally(e.getCause()); return null; }); return completableFuture; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 7762c22..30ce9c1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2003,7 +2003,26 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { return true; } } + private Throwable handleTxnException(Throwable ex, String op, long requestId) { + if (ex instanceof CoordinatorException.CoordinatorNotFoundException || ex != null + && ex.getCause() instanceof CoordinatorException.CoordinatorNotFoundException) { + if (log.isDebugEnabled()) { + log.debug("The Coordinator was not found for the request {}", op); + } + return ex; + } + if (ex instanceof ManagedLedgerException.ManagedLedgerFencedException || ex != null + && ex.getCause() instanceof ManagedLedgerException.ManagedLedgerFencedException) { + if (log.isDebugEnabled()) { + log.debug("Throw a CoordinatorNotFoundException to client " + + "with the message got from a ManagedLedgerFencedException for the request {}", op); + } + return new CoordinatorException.CoordinatorNotFoundException(ex.getMessage()); + } + log.error("Send response error for {} request {}.", op, requestId, ex); + return ex; + } @Override protected void handleNewTxn(CommandNewTxn command) { final long requestId = command.getRequestId(); @@ -2028,9 +2047,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { ctx.writeAndFlush(Commands.newTxnResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits())); } else { - if (log.isDebugEnabled()) { - log.debug("Send response error for new txn request {}", requestId, ex); - } + ex = handleTxnException(ex, BaseCommand.Type.NEW_TXN.name(), requestId); ctx.writeAndFlush(Commands.newTxnResponse(requestId, tcId.getId(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage())); @@ -2066,19 +2083,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits())); } else { - if (log.isDebugEnabled()) { - log.debug("Send response error for add published partition to txn request {}", requestId, - ex); - } + ex = handleTxnException(ex, BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId); - if (ex instanceof CoordinatorException.CoordinatorNotFoundException) { - ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(), - BrokerServiceException.getClientErrorCode(ex), ex.getMessage())); - } else { - ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(), - BrokerServiceException.getClientErrorCode(ex.getCause()), - ex.getCause().getMessage())); - } + ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(), + BrokerServiceException.getClientErrorCode(ex), + ex.getMessage())); transactionMetadataStoreService.handleOpFail(ex, tcId); } })); @@ -2105,16 +2114,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits())); } else { - log.error("Send response error for end txn request.", ex); + ex = handleTxnException(ex, BaseCommand.Type.END_TXN.name(), requestId); + ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(), + BrokerServiceException.getClientErrorCode(ex), ex.getMessage())); - if (ex instanceof CoordinatorException.CoordinatorNotFoundException) { - ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(), - BrokerServiceException.getClientErrorCode(ex), ex.getMessage())); - } else { - ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(), - BrokerServiceException.getClientErrorCode(ex.getCause()), - ex.getCause().getMessage())); - } transactionMetadataStoreService.handleOpFail(ex, tcId); } }); @@ -2325,20 +2328,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { txnID.getLeastSigBits(), txnID.getMostSigBits())); log.info("handle add partition to txn finish."); } else { - if (log.isDebugEnabled()) { - log.debug("Send response error for add published partition to txn request {}", - requestId, ex); - } + ex = handleTxnException(ex, BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId); - if (ex instanceof CoordinatorException.CoordinatorNotFoundException) { - ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId, - txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex), - ex.getMessage())); - } else { - ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId, - txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex.getCause()), - ex.getCause().getMessage())); - } + ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId, + txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex), + ex.getMessage())); transactionMetadataStoreService.handleOpFail(ex, tcId); } })); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java similarity index 50% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java index a51eae8..7fb924f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java @@ -16,19 +16,28 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.transaction; - +package org.apache.pulsar.client.impl; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.TransactionMetadataStoreService; +import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; import org.apache.pulsar.client.api.transaction.TxnID; -import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.client.impl.TransactionMetaStoreHandler; import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -39,10 +48,11 @@ import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertFalse; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -import static org.testng.FileAssert.fail; +@Slf4j public class TransactionClientConnectTest extends TransactionTestBase { private static final String RECONNECT_TOPIC = NAMESPACE1 + "/txn-client-reconnect-test"; @@ -60,142 +70,69 @@ public class TransactionClientConnectTest extends TransactionTestBase { @Test public void testTransactionNewReconnect() throws Exception { - start(); - - // when throw CoordinatorNotFoundException client will reconnect tc - try { - pulsarClient.newTransaction() - .withTransactionTimeout(200, TimeUnit.MILLISECONDS).build().get(); - fail(); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException); - } - reconnect(); - - fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService()); - - // tc fence will remove this tc and reopen - try { - pulsarClient.newTransaction() - .withTransactionTimeout(200, TimeUnit.MILLISECONDS).build().get(); - fail(); - } catch (ExecutionException e) { - assertEquals(e.getCause().getMessage(), - "org.apache.bookkeeper.mledger.ManagedLedgerException$ManagedLedgerFencedException: " + - "java.lang.Exception: Attempted to use a fenced managed ledger"); - } - - reconnect(); + Callable<CompletableFuture<?>> callable = () -> pulsarClient.newTransaction() + .withTransactionTimeout(200, TimeUnit.MILLISECONDS).build(); + tryCommandReconnect(callable, callable); } @Test public void testTransactionAddSubscriptionToTxnAsyncReconnect() throws Exception { TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient(); - start(); + Callable<CompletableFuture<?>> callable = () -> transactionCoordinatorClient + .addSubscriptionToTxnAsync(new TxnID(0, 0), "test", "test"); + tryCommandReconnect(callable, callable); + } + public void tryCommandReconnect(Callable<CompletableFuture<?>> callable1, Callable<CompletableFuture<?>> callable2) + throws Exception { + start(); try { - transactionCoordinatorClient.addSubscriptionToTxnAsync(new TxnID(0, 0), "test", "test").get(); - fail(); + callable1.call().get(); } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException); + assertFalse(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException); + waitToReady(); + callable1.call().get(); } - - reconnect(); fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService()); + CompletableFuture<?> completableFuture = callable2.call(); try { - transactionCoordinatorClient.addSubscriptionToTxnAsync(new TxnID(0, 0), "test", "test").get(); - fail(); + completableFuture.get(3, TimeUnit.SECONDS); + } catch (TimeoutException ignore) { } catch (ExecutionException e) { - if (e.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException) { - assertEquals(e.getCause().getMessage(), "The transaction with this txdID `(0,0)`not found "); - } else { - assertEquals(e.getCause().getMessage(), "java.lang.Exception: Attempted to use a fenced managed ledger"); - } + Assert.assertFalse(e.getCause() + instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException); } - reconnect(); + + unFence(getPulsarServiceList().get(0).getTransactionMetadataStoreService()); + completableFuture.get(); } @Test public void testTransactionAbortToTxnAsyncReconnect() throws Exception { TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient(); - start(); - - try { - transactionCoordinatorClient.abortAsync(new TxnID(0, 0)).get(); - fail(); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException); - } - - reconnect(); - fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService()); - try { - transactionCoordinatorClient.abortAsync(new TxnID(0, 0)).get(); - fail(); - } catch (ExecutionException e) { - if (e.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException) { - assertEquals(e.getCause().getMessage(), "The transaction with this txdID `(0,0)`not found "); - } else { - assertEquals(e.getCause().getMessage(), "java.lang.Exception: Attempted to use a fenced managed ledger"); - } - } - reconnect(); + Callable<CompletableFuture<?>> callable1 = () -> transactionCoordinatorClient.abortAsync(new TxnID(0, + 0)); + Callable<CompletableFuture<?>> callable2 = () -> transactionCoordinatorClient.abortAsync(new TxnID(0, + 1)); + tryCommandReconnect(callable1, callable2); } @Test public void testTransactionCommitToTxnAsyncReconnect() throws Exception { TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient(); - start(); - - try { - transactionCoordinatorClient.commitAsync(new TxnID(0, 0)).get(); - fail(); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException); - } - - reconnect(); - fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService()); - try { - transactionCoordinatorClient.commitAsync(new TxnID(0, 0)).get(); - fail(); - } catch (ExecutionException e) { - if (e.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException) { - assertEquals(e.getCause().getMessage(), "The transaction with this txdID `(0,0)`not found "); - } else { - assertEquals(e.getCause().getMessage(), "java.lang.Exception: Attempted to use a fenced managed ledger"); - } - } - reconnect(); + Callable<CompletableFuture<?>> callable1 = () -> transactionCoordinatorClient.commitAsync(new TxnID(0, + 0)); + Callable<CompletableFuture<?>> callable2 = () -> transactionCoordinatorClient.commitAsync(new TxnID(0, + 1)); + tryCommandReconnect(callable1, callable2); } @Test public void testTransactionAddPublishPartitionToTxnReconnect() throws Exception { TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient(); - start(); - - try { - transactionCoordinatorClient.addPublishPartitionToTxnAsync(new TxnID(0, 0), - Collections.singletonList("test")).get(); - fail(); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException); - } - - reconnect(); - fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService()); - try { - transactionCoordinatorClient.addPublishPartitionToTxnAsync(new TxnID(0, 0), - Collections.singletonList("test")).get(); - fail(); - } catch (ExecutionException e) { - if (e.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException) { - assertEquals(e.getCause().getMessage(), "The transaction with this txdID `(0,0)`not found "); - } else { - assertEquals(e.getCause().getMessage(), "java.lang.Exception: Attempted to use a fenced managed ledger"); - } - } - reconnect(); + Callable<CompletableFuture<?>> callable = () -> transactionCoordinatorClient.addPublishPartitionToTxnAsync(new TxnID(0, 0), + Collections.singletonList("test")); + tryCommandReconnect(callable, callable); } @Test @@ -209,7 +146,11 @@ public class TransactionClientConnectTest extends TransactionTestBase { for (TransactionMetaStoreHandler handler : handlers) { handler.newTransactionAsync(10, TimeUnit.SECONDS).get(); } - pulsarClient.close(); + for (TransactionMetaStoreHandler handler : handlers) { + Field stateField = HandlerState.class.getDeclaredField("state"); + stateField.setAccessible(true); + stateField.set(handler, HandlerState.State.Closed); + } for (TransactionMetaStoreHandler handler : handlers) { Method method = TransactionMetaStoreHandler.class.getMethod("getConnectHandleState"); method.setAccessible(true); @@ -225,21 +166,14 @@ public class TransactionClientConnectTest extends TransactionTestBase { public void start() throws Exception { // wait transaction coordinator init success - Awaitility.await().until(() -> { - try { - pulsarClient.newTransaction() - .withTransactionTimeout(200, TimeUnit.MILLISECONDS).build().get(); - } catch (Exception e) { - return false; - } - return true; - }); pulsarClient.newTransaction() - .withTransactionTimeout(200, TimeUnit.MILLISECONDS).build().get(); + .withTransactionTimeout(30, TimeUnit.SECONDS).build().get(); + pulsarClient.newTransaction() + .withTransactionTimeout(30, TimeUnit.SECONDS).build().get(); TransactionMetadataStoreService transactionMetadataStoreService = getPulsarServiceList().get(0).getTransactionMetadataStoreService(); - // remove transaction metadata store + // remove transaction metadap0-ta store transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0)).get(); } @@ -250,15 +184,35 @@ public class TransactionClientConnectTest extends TransactionTestBase { field.set(((MLTransactionMetadataStore) transactionMetadataStoreService.getStores() .get(TransactionCoordinatorID.get(0))).getManagedLedger(), ManagedLedgerImpl.State.Fenced); } + public void unFence(TransactionMetadataStoreService transactionMetadataStoreService) throws Exception { + Field field = ManagedLedgerImpl.class.getDeclaredField("state"); + field.setAccessible(true); + field.set(((MLTransactionMetadataStore) transactionMetadataStoreService.getStores() + .get(TransactionCoordinatorID.get(0))).getManagedLedger(), ManagedLedgerImpl.State.LedgerOpened); + } - public void reconnect() { - //reconnect + public void waitToReady() throws Exception{ + TransactionMetadataStoreService transactionMetadataStoreService = + getPulsarServiceList().get(0).getTransactionMetadataStoreService(); + Class<TransactionMetadataStoreService> transactionMetadataStoreServiceClass = + TransactionMetadataStoreService.class; + Field field1 = + transactionMetadataStoreServiceClass.getDeclaredField("stores"); + field1.setAccessible(true); + Map<TransactionCoordinatorID, TransactionMetadataStore> stores = + (Map<TransactionCoordinatorID, TransactionMetadataStore>) field1 + .get(transactionMetadataStoreService); Awaitility.await().until(() -> { - try { - pulsarClient.newTransaction() - .withTransactionTimeout(200, TimeUnit.MILLISECONDS).build().get(); - } catch (Exception e) { - return false; + for (TransactionMetadataStore transactionMetadataStore : stores.values()) { + Class<TransactionMetadataStoreState> transactionMetadataStoreStateClass = + TransactionMetadataStoreState.class; + Field field = transactionMetadataStoreStateClass.getDeclaredField("state"); + field.setAccessible(true); + TransactionMetadataStoreState.State state = + (TransactionMetadataStoreState.State) field.get(transactionMetadataStore); + if (!state.equals(TransactionMetadataStoreState.State.Ready)) { + return false; + } } return true; }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index ba6ee50..b2b756a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -23,7 +23,9 @@ import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; import io.netty.util.Timeout; +import io.netty.util.Timer; import io.netty.util.TimerTask; +import java.util.concurrent.ExecutorService; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; import org.apache.pulsar.client.api.transaction.TxnID; @@ -61,6 +63,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect new ConcurrentLongHashMap<>(16, 1); private final ConcurrentLinkedQueue<RequestTime> timeoutQueue; + protected final Timer timer; + private final ExecutorService internalPinnedExecutor; + private static class RequestTime { final long creationTimeMs; final long requestId; @@ -96,6 +101,8 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect this); this.connectFuture = connectFuture; this.connectionHandler.grabCnx(); + this.timer = pulsarClient.timer(); + internalPinnedExecutor = pulsarClient.getInternalExecutorService(); } @Override @@ -109,64 +116,73 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect @Override public void connectionOpened(ClientCnx cnx) { - LOG.info("Transaction meta handler with transaction coordinator id {} connection opened.", - transactionCoordinatorId); - - if (getState() == State.Closing || getState() == State.Closed) { - setState(State.Closed); - failPendingRequest(); - this.pendingRequests.clear(); - return; - } - - connectionHandler.setClientCnx(cnx); - cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, this); - - // if broker protocol version < 19, don't send TcClientConnectRequest to broker. - if (cnx.getRemoteEndpointProtocolVersion() > ProtocolVersion.v18.getValue()) { - long requestId = client.newRequestId(); - ByteBuf request = Commands.newTcClientConnectRequest(transactionCoordinatorId, requestId); + internalPinnedExecutor.execute(() -> { + LOG.info("Transaction meta handler with transaction coordinator id {} connection opened.", + transactionCoordinatorId); + + if (getState() == State.Closing || getState() == State.Closed) { + setState(State.Closed); + failPendingRequest(); + this.pendingRequests.clear(); + return; + } - cnx.sendRequestWithId(request, requestId).thenRun(() -> { - LOG.info("Transaction coordinator client connect success! tcId : {}", transactionCoordinatorId); + connectionHandler.setClientCnx(cnx); + cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, this); + + // if broker protocol version < 19, don't send TcClientConnectRequest to broker. + if (cnx.getRemoteEndpointProtocolVersion() > ProtocolVersion.v18.getValue()) { + long requestId = client.newRequestId(); + ByteBuf request = Commands.newTcClientConnectRequest(transactionCoordinatorId, requestId); + + cnx.sendRequestWithId(request, requestId).thenRun(() -> { + internalPinnedExecutor.execute(() -> { + LOG.info("Transaction coordinator client connect success! tcId : {}", transactionCoordinatorId); + if (!changeToReadyState()) { + setState(State.Closed); + cnx.channel().close(); + } + + if (!this.connectFuture.isDone()) { + this.connectFuture.complete(null); + } + this.connectionHandler.resetBackoff(); + pendingRequests.forEach((requestID, opBase) -> checkStateAndSendRequest(opBase)); + }); + }).exceptionally((e) -> { + internalPinnedExecutor.execute(() -> { + LOG.error("Transaction coordinator client connect fail! tcId : {}", + transactionCoordinatorId, e.getCause()); + if (getState() == State.Closing || getState() == State.Closed + || e.getCause() instanceof PulsarClientException.NotAllowedException) { + setState(State.Closed); + cnx.channel().close(); + } else { + connectionHandler.reconnectLater(e.getCause()); + } + }); + return null; + }); + } else { if (!changeToReadyState()) { - setState(State.Closed); - cnx.channel().close(); - } - - if (!this.connectFuture.isDone()) { - this.connectFuture.complete(null); - } - this.connectionHandler.resetBackoff(); - }).exceptionally((e) -> { - LOG.error("Transaction coordinator client connect fail! tcId : {}", - transactionCoordinatorId, e.getCause()); - if (getState() == State.Closing || getState() == State.Closed - || e.getCause() instanceof PulsarClientException.NotAllowedException) { - setState(State.Closed); cnx.channel().close(); - } else { - connectionHandler.reconnectLater(e.getCause()); } - return null; - }); - } else { - if (!changeToReadyState()) { - cnx.channel().close(); + this.connectFuture.complete(null); } - this.connectFuture.complete(null); - } + }); } private void failPendingRequest() { - pendingRequests.keys().forEach(k -> { - OpBase<?> op = pendingRequests.remove(k); - if (op != null && !op.callback.isDone()) { - op.callback.completeExceptionally(new PulsarClientException.AlreadyClosedException( - "Could not get response from transaction meta store when " + - "the transaction meta store has already close.")); - onResponse(op); - } + internalPinnedExecutor.execute(() -> { + pendingRequests.keys().forEach(k -> { + OpBase<?> op = pendingRequests.remove(k); + if (op != null && !op.callback.isDone()) { + op.callback.completeExceptionally(new PulsarClientException.AlreadyClosedException( + "Could not get response from transaction meta store when " + + "the transaction meta store has already close.")); + onResponse(op); + } + }); }); } @@ -175,42 +191,76 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect LOG.debug("New transaction with timeout in ms {}", unit.toMillis(timeout)); } CompletableFuture<TxnID> callback = new CompletableFuture<>(); - if (!canSendRequest(callback)) { return callback; } long requestId = client.newRequestId(); ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId, unit.toMillis(timeout)); - OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback); - pendingRequests.put(requestId, op); - timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); - cmd.retain(); - cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise()); + OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback, client); + internalPinnedExecutor.execute(() -> { + pendingRequests.put(requestId, op); + timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); + checkStateAndSendRequest(op); + }); return callback; } void handleNewTxnResponse(CommandNewTxnResponse response) { - OpForTxnIdCallBack op = (OpForTxnIdCallBack) pendingRequests.remove(response.getRequestId()); - if (op == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Got new txn response for timeout {} - {}", response.getTxnidMostBits(), - response.getTxnidLeastBits()); + boolean hasError = response.hasError(); + ServerError error; + String message; + if (hasError) { + error = response.getError(); + message = response.getMessage(); + } else { + error = null; + message = null; + } + TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()); + long requestId = response.getRequestId(); + internalPinnedExecutor.execute(() -> { + OpForTxnIdCallBack op = (OpForTxnIdCallBack) pendingRequests.remove(requestId); + if (op == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got new txn response for timeout {} - {}", txnID.getMostSigBits(), + txnID.getLeastSigBits()); + } + return; } - return; - } - if (!response.hasError()) { - TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()); - if (LOG.isDebugEnabled()) { - LOG.debug("Got new txn response {} for request {}", txnID, response.getRequestId()); + if (!hasError) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got new txn response {} for request {}", txnID, requestId); + } + op.callback.complete(txnID); + } else { + if (checkIfNeedRetryByError(error, message, op)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Get a response for the {} request {} error " + + "TransactionCoordinatorNotFound and try it again", + BaseCommand.Type.NEW_TXN.name(), requestId); + } + pendingRequests.put(requestId, op); + timer.newTimeout(timeout -> { + internalPinnedExecutor.execute(() -> { + if (!pendingRequests.containsKey(requestId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("The request {} already timeout", requestId); + } + return; + } + checkStateAndSendRequest(op); + }); + } + , op.backoff.next(), TimeUnit.MILLISECONDS); + return; + } + LOG.error("Got {} for request {} error {}", BaseCommand.Type.NEW_TXN.name(), + requestId, error); } - op.callback.complete(txnID); - } else { - LOG.error("Got new txn for request {} error {}", response.getRequestId(), response.getError()); - handleTransactionFailOp(response.getError(), response.getMessage(), op); - } - onResponse(op); + onResponse(op); + }); } public CompletableFuture<Void> addPublishPartitionToTxnAsync(TxnID txnID, List<String> partitions) { @@ -218,42 +268,80 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect LOG.debug("Add publish partition {} to txn {}", partitions, txnID); } CompletableFuture<Void> callback = new CompletableFuture<>(); - if (!canSendRequest(callback)) { return callback; } long requestId = client.newRequestId(); ByteBuf cmd = Commands.newAddPartitionToTxn( requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), partitions); - OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback); - pendingRequests.put(requestId, op); - timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); - cmd.retain(); - cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise()); + OpForVoidCallBack op = OpForVoidCallBack + .create(cmd, callback, client); + internalPinnedExecutor.execute(() -> { + pendingRequests.put(requestId, op); + timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); + checkStateAndSendRequest(op); + }); + return callback; } void handleAddPublishPartitionToTxnResponse(CommandAddPartitionToTxnResponse response) { - OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(response.getRequestId()); - if (op == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Got add publish partition to txn response for timeout {} - {}", response.getTxnidMostBits(), - response.getTxnidLeastBits()); + boolean hasError = response.hasError(); + ServerError error; + String message; + if (hasError) { + error = response.getError(); + message = response.getMessage(); + } else { + error = null; + message = null; + } + TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()); + long requestId = response.getRequestId(); + internalPinnedExecutor.execute(() -> { + OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(requestId); + if (op == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got add publish partition to txn response for timeout {} - {}", txnID.getMostSigBits(), + txnID.getLeastSigBits()); + } + return; } - return; - } - if (!response.hasError()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Add publish partition for request {} success.", response.getRequestId()); + if (!hasError) { + if (LOG.isDebugEnabled()) { + LOG.debug("Add publish partition for request {} success.", requestId); + } + op.callback.complete(null); + } else { + if (checkIfNeedRetryByError(error, message, op)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Get a response for the {} request {} " + + " error TransactionCoordinatorNotFound and try it again", + BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId); + } + pendingRequests.put(requestId, op); + timer.newTimeout(timeout -> { + internalPinnedExecutor.execute(() -> { + if (!pendingRequests.containsKey(requestId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("The request {} already timeout", requestId); + } + return; + } + checkStateAndSendRequest(op); + }); + } + , op.backoff.next(), TimeUnit.MILLISECONDS); + return; + } + LOG.error("{} for request {} error {} with txnID {}.", BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), + requestId, error, txnID); + } - op.callback.complete(null); - } else { - LOG.error("Add publish partition for request {} error {}.", response.getRequestId(), response.getError()); - handleTransactionFailOp(response.getError(), response.getMessage(), op); - } - onResponse(op); + onResponse(op); + }); } public CompletableFuture<Void> addSubscriptionToTxn(TxnID txnID, List<Subscription> subscriptionList) { @@ -262,41 +350,76 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect } CompletableFuture<Void> callback = new CompletableFuture<>(); - if (!canSendRequest(callback)) { return callback; } long requestId = client.newRequestId(); ByteBuf cmd = Commands.newAddSubscriptionToTxn( requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), subscriptionList); - OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback); - pendingRequests.put(requestId, op); - timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); - cmd.retain(); - cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise()); + OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback, client); + internalPinnedExecutor.execute(() -> { + pendingRequests.put(requestId, op); + timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); + checkStateAndSendRequest(op); + }); return callback; } public void handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnResponse response) { - OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(response.getRequestId()); - if (op == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Add subscription to txn timeout for request {}.", response.getRequestId()); + boolean hasError = response.hasError(); + ServerError error; + String message; + if (hasError) { + error = response.getError(); + message = response.getMessage(); + } else { + error = null; + message = null; + } + long requestId = response.getRequestId(); + internalPinnedExecutor.execute(() -> { + OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(requestId); + if (op == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Add subscription to txn timeout for request {}.", requestId); + } + return; } - return; - } - if (!response.hasError()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Add subscription to txn success for request {}.", response.getRequestId()); + if (!hasError) { + if (LOG.isDebugEnabled()) { + LOG.debug("Add subscription to txn success for request {}.", requestId); + } + op.callback.complete(null); + } else { + LOG.error("Add subscription to txn failed for request {} error {}.", + requestId, error); + if (checkIfNeedRetryByError(error, message, op)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Get a response for {} request {} error TransactionCoordinatorNotFound and try it again", + BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId); + } + pendingRequests.put(requestId, op); + timer.newTimeout(timeout -> { + internalPinnedExecutor.execute(() -> { + if (!pendingRequests.containsKey(requestId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("The request {} already timeout", requestId); + } + return; + } + checkStateAndSendRequest(op); + }); + } + , op.backoff.next(), TimeUnit.MILLISECONDS); + return; + } + LOG.error("{} failed for request {} error {}.", BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), + requestId, error); + } - op.callback.complete(null); - } else { - LOG.error("Add subscription to txn failed for request {} error {}.", - response.getRequestId(), response.getError()); - handleTransactionFailOp(response.getError(), response.getMessage(), op); - } - onResponse(op); + onResponse(op); + }); } public CompletableFuture<Void> endTxnAsync(TxnID txnID, TxnAction action) { @@ -304,68 +427,115 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect LOG.debug("End txn {}, action {}", txnID, action); } CompletableFuture<Void> callback = new CompletableFuture<>(); - if (!canSendRequest(callback)) { return callback; } long requestId = client.newRequestId(); BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), action); ByteBuf buf = Commands.serializeWithSize(cmd); - OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback); - pendingRequests.put(requestId, op); - timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); - buf.retain(); - cnx().ctx().writeAndFlush(buf, cnx().ctx().voidPromise()); + OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback, client); + internalPinnedExecutor.execute(() -> { + pendingRequests.put(requestId, op); + timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); + checkStateAndSendRequest(op); + }); return callback; } void handleEndTxnResponse(CommandEndTxnResponse response) { - OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(response.getRequestId()); - if (op == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Got end txn response for timeout {} - {}", response.getTxnidMostBits(), - response.getTxnidLeastBits()); + boolean hasError = response.hasError(); + ServerError error; + String message; + if (hasError) { + error = response.getError(); + message = response.getMessage(); + } else { + error = null; + message = null; + } + TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()); + long requestId = response.getRequestId(); + internalPinnedExecutor.execute(() -> { + OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(requestId); + if (op == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got end txn response for timeout {} - {}", txnID.getMostSigBits(), + txnID.getLeastSigBits()); + } + return; } - return; - } - if (!response.hasError()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Got end txn response success for request {}", response.getRequestId()); - } - op.callback.complete(null); - } else { - LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError()); - handleTransactionFailOp(response.getError(), response.getMessage(), op); - } + if (!hasError) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got end txn response success for request {}", requestId); + } + op.callback.complete(null); + } else { + if (checkIfNeedRetryByError(error, message, op)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Get a response for the {} request {} error " + + "TransactionCoordinatorNotFound and try it again", + BaseCommand.Type.END_TXN.name(), requestId); + } + pendingRequests.put(requestId, op); + timer.newTimeout(timeout -> { + internalPinnedExecutor.execute(() -> { + if (!pendingRequests.containsKey(requestId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("The request {} already timeout", requestId); + } + return; + } + checkStateAndSendRequest(op); + }); + } + , op.backoff.next(), TimeUnit.MILLISECONDS); + return; + } + LOG.error("Got {} response for request {} error {}", BaseCommand.Type.END_TXN.name(), + requestId, error); - onResponse(op); + } + onResponse(op); + }); } - private void handleTransactionFailOp(ServerError error, String message, OpBase<?> op) { - if (error == ServerError.TransactionCoordinatorNotFound && getState() != State.Connecting) { - connectionHandler.reconnectLater(new TransactionCoordinatorClientException - .CoordinatorNotFoundException(message)); + + private boolean checkIfNeedRetryByError(ServerError error, String message, OpBase<?> op) { + if (error == ServerError.TransactionCoordinatorNotFound) { + if (getState() != State.Connecting) { + connectionHandler.reconnectLater(new TransactionCoordinatorClientException + .CoordinatorNotFoundException(message)); + } + return true; } if (op != null) { op.callback.completeExceptionally(getExceptionByServerError(error, message)); } + return false; } private static abstract class OpBase<T> { protected ByteBuf cmd; protected CompletableFuture<T> callback; + protected Backoff backoff; abstract void recycle(); } private static class OpForTxnIdCallBack extends OpBase<TxnID> { - static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> callback) { + static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> callback, PulsarClientImpl client) { OpForTxnIdCallBack op = RECYCLER.get(); op.callback = callback; op.cmd = cmd; + op.backoff = new BackoffBuilder() + .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), + TimeUnit.NANOSECONDS) + .setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10, TimeUnit.NANOSECONDS) + .setMandatoryStop(0, TimeUnit.MILLISECONDS) + .create(); return op; } @@ -375,6 +545,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect @Override void recycle() { + this.backoff = null; + this.cmd = null; + this.callback = null; recyclerHandle.recycle(this); } @@ -389,18 +562,29 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect private static class OpForVoidCallBack extends OpBase<Void> { - static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void> callback) { + + static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void> callback, PulsarClientImpl client) { OpForVoidCallBack op = RECYCLER.get(); op.callback = callback; op.cmd = cmd; + op.backoff = new BackoffBuilder() + .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), + TimeUnit.NANOSECONDS) + .setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10, TimeUnit.NANOSECONDS) + .setMandatoryStop(0, TimeUnit.MILLISECONDS) + .create(); return op; } + private OpForVoidCallBack(Recycler.Handle<OpForVoidCallBack> recyclerHandle) { this.recyclerHandle = recyclerHandle; } @Override void recycle() { + this.backoff = null; + this.cmd = null; + this.callback = null; recyclerHandle.recycle(this); } @@ -433,9 +617,6 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect } private boolean canSendRequest(CompletableFuture<?> callback) { - if (!isValidHandlerState(callback)) { - return false; - } try { if (blockIfReachMaxPendingOps) { semaphore.acquire(); @@ -453,81 +634,89 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect return true; } - private boolean isValidHandlerState(CompletableFuture<?> callback) { + private void checkStateAndSendRequest(OpBase<?> op) { switch (getState()) { case Ready: - return true; + ClientCnx cnx = cnx(); + if (cnx != null) { + op.cmd.retain(); + cnx.ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise()); + } else { + LOG.error("The cnx was null when the TC handler was ready", new NullPointerException()); + } + break; case Connecting: - callback.completeExceptionally( - new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException( - "Transaction meta store handler for tcId " - + transactionCoordinatorId - + " is connecting now, please try later.")); - return false; + break; case Closing: case Closed: - callback.completeExceptionally( + op.callback.completeExceptionally( new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException( "Transaction meta store handler for tcId " + transactionCoordinatorId + " is closing or closed.")); - return false; + onResponse(op); + break; case Failed: case Uninitialized: - callback.completeExceptionally( + op.callback.completeExceptionally( new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException( "Transaction meta store handler for tcId " + transactionCoordinatorId + " not connected.")); - return false; + onResponse(op); + break; default: - callback.completeExceptionally( + op.callback.completeExceptionally( new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException( transactionCoordinatorId)); - return false; + onResponse(op); + break; } } @Override public void run(Timeout timeout) throws Exception { - if (timeout.isCancelled()) { - return; - } - long timeToWaitMs; - if (getState() == State.Closing || getState() == State.Closed) { - return; - } - RequestTime peeked = timeoutQueue.peek(); - while (peeked != null && peeked.creationTimeMs + client.getConfiguration().getOperationTimeoutMs() - - System.currentTimeMillis() <= 0) { - RequestTime lastPolled = timeoutQueue.poll(); - if (lastPolled != null) { - OpBase<?> op = pendingRequests.remove(lastPolled.requestId); - if (op != null && !op.callback.isDone()) { - op.callback.completeExceptionally(new PulsarClientException.TimeoutException( - "Could not get response from transaction meta store within given timeout.")); - if (LOG.isDebugEnabled()) { - LOG.debug("Transaction coordinator request {} is timeout.", lastPolled.requestId); + internalPinnedExecutor.execute(() -> { + if (timeout.isCancelled()) { + return; + } + long timeToWaitMs; + if (getState() == State.Closing || getState() == State.Closed) { + return; + } + RequestTime peeked = timeoutQueue.peek(); + while (peeked != null && peeked.creationTimeMs + client.getConfiguration().getOperationTimeoutMs() + - System.currentTimeMillis() <= 0) { + RequestTime lastPolled = timeoutQueue.poll(); + if (lastPolled != null) { + OpBase<?> op = pendingRequests.remove(lastPolled.requestId); + if (op != null && !op.callback.isDone()) { + op.callback.completeExceptionally(new PulsarClientException.TimeoutException( + "Could not get response from transaction meta store within given timeout.")); + if (LOG.isDebugEnabled()) { + LOG.debug("Transaction coordinator request {} is timeout.", lastPolled.requestId); + } + onResponse(op); } - onResponse(op); + } else { + break; } - } else { - break; + peeked = timeoutQueue.peek(); } - peeked = timeoutQueue.peek(); - } - if (peeked == null) { - timeToWaitMs = client.getConfiguration().getOperationTimeoutMs(); - } else { - long diff = (peeked.creationTimeMs + client.getConfiguration().getOperationTimeoutMs()) - System.currentTimeMillis(); - if (diff <= 0) { + if (peeked == null) { timeToWaitMs = client.getConfiguration().getOperationTimeoutMs(); } else { - timeToWaitMs = diff; + long diff = (peeked.creationTimeMs + client.getConfiguration().getOperationTimeoutMs()) + - System.currentTimeMillis(); + if (diff <= 0) { + timeToWaitMs = client.getConfiguration().getOperationTimeoutMs(); + } else { + timeToWaitMs = diff; + } } - } - requestTimeout = client.timer().newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS); + requestTimeout = client.timer().newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS); + }); } private ClientCnx cnx() {
