congbobo184 commented on a change in pull request #13135: URL: https://github.com/apache/pulsar/pull/13135#discussion_r763747147
########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java ########## @@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) { } op.callback.complete(null); } else { - LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError()); handleTransactionFailOp(response.getError(), response.getMessage(), op); + if (response.getError() == ServerError.TransactionCoordinatorNotFound) { + if (LOG.isDebugEnabled()) { + LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again", + response.getRequestId()); + } + timer.newTimeout(timeout -> { + long requestId = client.newRequestId(); + TxnID txnID = op.txnID; + TxnAction action = op.action; + BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), + txnID.getMostSigBits(), action); + OpForEndTxnCallBack opNew = OpForEndTxnCallBack + .create(Commands.serializeWithSize(cmd), op.callback, txnID, action); + op.cmd.release(); + op.recycle(); + tryExecuteCommandAgain(opNew, requestId); + }, op.backoff.next(), TimeUnit.MILLISECONDS); + return; + } else { + LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError()); + } } - onResponse(op); } + private <T> void tryExecuteCommandAgain(OpBase<T> op, long requestId) { + if (cnx() == null) { + timer.newTimeout(timeout -> + tryExecuteCommandAgain(op, requestId), op.backoff.next(), TimeUnit.MILLISECONDS); + return; + } + pendingRequests.put(requestId, op); + timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); + cnx().ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise()); + } + private void handleTransactionFailOp(ServerError error, String message, OpBase<?> op) { if (error == ServerError.TransactionCoordinatorNotFound && getState() != State.Connecting) { Review comment: ``` if (error == ServerError.TransactionCoordinatorNotFound) { if (getState() != State.Connecting)) { connectionHandler.reconnectLater(new TransactionCoordinatorClientException .CoordinatorNotFoundException(message)); } return; } ``` ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java ########## @@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) { } op.callback.complete(null); } else { - LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError()); handleTransactionFailOp(response.getError(), response.getMessage(), op); + if (response.getError() == ServerError.TransactionCoordinatorNotFound) { + if (LOG.isDebugEnabled()) { + LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again", + response.getRequestId()); + } + timer.newTimeout(timeout -> { + long requestId = client.newRequestId(); + TxnID txnID = op.txnID; + TxnAction action = op.action; + BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), + txnID.getMostSigBits(), action); + OpForEndTxnCallBack opNew = OpForEndTxnCallBack + .create(Commands.serializeWithSize(cmd), op.callback, txnID, action); + op.cmd.release(); + op.recycle(); + tryExecuteCommandAgain(opNew, requestId); + }, op.backoff.next(), TimeUnit.MILLISECONDS); + return; + } else { Review comment: use return, we don't need else. ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java ########## @@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) { } op.callback.complete(null); } else { - LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError()); handleTransactionFailOp(response.getError(), response.getMessage(), op); + if (response.getError() == ServerError.TransactionCoordinatorNotFound) { + if (LOG.isDebugEnabled()) { + LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again", + response.getRequestId()); + } + timer.newTimeout(timeout -> { + long requestId = client.newRequestId(); + TxnID txnID = op.txnID; + TxnAction action = op.action; + BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), + txnID.getMostSigBits(), action); + OpForEndTxnCallBack opNew = OpForEndTxnCallBack + .create(Commands.serializeWithSize(cmd), op.callback, txnID, action); + op.cmd.release(); + op.recycle(); + tryExecuteCommandAgain(opNew, requestId); + }, op.backoff.next(), TimeUnit.MILLISECONDS); + return; + } else { + LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError()); + } } - onResponse(op); } + private <T> void tryExecuteCommandAgain(OpBase<T> op, long requestId) { + if (cnx() == null) { + timer.newTimeout(timeout -> + tryExecuteCommandAgain(op, requestId), op.backoff.next(), TimeUnit.MILLISECONDS); + return; + } + pendingRequests.put(requestId, op); + timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); + cnx().ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise()); Review comment: should use local variable, otherwise it will produce NPE. code like this ``` ClientCnx cnx = cnx(); if (cnx == null) { timer.newTimeout(timeout -> tryExecuteCommandAgain(op, requestId), op.backoff.next(), TimeUnit.MILLISECONDS); return; } pendingRequests.put(requestId, op); timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); cnx.ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise()); ``` ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java ########## @@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) { } op.callback.complete(null); } else { - LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError()); handleTransactionFailOp(response.getError(), response.getMessage(), op); + if (response.getError() == ServerError.TransactionCoordinatorNotFound) { + if (LOG.isDebugEnabled()) { + LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again", + response.getRequestId()); + } + timer.newTimeout(timeout -> { + long requestId = client.newRequestId(); + TxnID txnID = op.txnID; + TxnAction action = op.action; + BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), + txnID.getMostSigBits(), action); + OpForEndTxnCallBack opNew = OpForEndTxnCallBack + .create(Commands.serializeWithSize(cmd), op.callback, txnID, action); + op.cmd.release(); + op.recycle(); + tryExecuteCommandAgain(opNew, requestId); + }, op.backoff.next(), TimeUnit.MILLISECONDS); + return; + } else { + LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError()); + } } - onResponse(op); } + private <T> void tryExecuteCommandAgain(OpBase<T> op, long requestId) { + if (cnx() == null) { + timer.newTimeout(timeout -> + tryExecuteCommandAgain(op, requestId), op.backoff.next(), TimeUnit.MILLISECONDS); + return; + } + pendingRequests.put(requestId, op); + timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); + cnx().ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise()); + } + private void handleTransactionFailOp(ServerError error, String message, OpBase<?> op) { if (error == ServerError.TransactionCoordinatorNotFound && getState() != State.Connecting) { connectionHandler.reconnectLater(new TransactionCoordinatorClientException .CoordinatorNotFoundException(message)); } - if (op != null) { + if (op != null && error != ServerError.TransactionCoordinatorNotFound) { op.callback.completeExceptionally(getExceptionByServerError(error, message)); } } private static abstract class OpBase<T> { protected ByteBuf cmd; protected CompletableFuture<T> callback; + protected Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, 3, TimeUnit.SECONDS, 10, + TimeUnit.SECONDS); abstract void recycle(); } - private static class OpForTxnIdCallBack extends OpBase<TxnID> { + private static class OpForNewTxnCallBack extends OpBase<TxnID> { - static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> callback) { - OpForTxnIdCallBack op = RECYCLER.get(); + protected long timeout; + protected TimeUnit unit; + static OpForNewTxnCallBack create(ByteBuf cmd, CompletableFuture<TxnID> callback, long timeout, TimeUnit unit) { + OpForNewTxnCallBack op = RECYCLER.get(); op.callback = callback; op.cmd = cmd; + op.timeout = timeout; + op.unit = unit; Review comment: every field need to be set initial value like `org.apache.bookkeeper.mledger.impl.OpReadEntry.recycle()` ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java ########## @@ -295,6 +340,28 @@ public void handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnRespon LOG.error("Add subscription to txn failed for request {} error {}.", response.getRequestId(), response.getError()); handleTransactionFailOp(response.getError(), response.getMessage(), op); + if (response.getError() == ServerError.TransactionCoordinatorNotFound) { + if (LOG.isDebugEnabled()) { + LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again", + response.getRequestId()); + } + timer.newTimeout(timeout -> { + long requestId = client.newRequestId(); + TxnID txnID = op.txnID; + List<Subscription> subscriptionList = op.subscriptionList; + ByteBuf cmd = Commands.newAddSubscriptionToTxn( + requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), subscriptionList); + OpForAddSubscriptionToTxnCallBack opNew = OpForAddSubscriptionToTxnCallBack + .create(cmd, op.callback, txnID, subscriptionList); + op.cmd.release(); Review comment: use ReferenceCountUtil.safeRelease() is better. ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java ########## @@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) { } op.callback.complete(null); } else { - LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError()); handleTransactionFailOp(response.getError(), response.getMessage(), op); + if (response.getError() == ServerError.TransactionCoordinatorNotFound) { + if (LOG.isDebugEnabled()) { + LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again", + response.getRequestId()); + } + timer.newTimeout(timeout -> { + long requestId = client.newRequestId(); + TxnID txnID = op.txnID; + TxnAction action = op.action; + BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), + txnID.getMostSigBits(), action); + OpForEndTxnCallBack opNew = OpForEndTxnCallBack + .create(Commands.serializeWithSize(cmd), op.callback, txnID, action); + op.cmd.release(); + op.recycle(); + tryExecuteCommandAgain(opNew, requestId); + }, op.backoff.next(), TimeUnit.MILLISECONDS); + return; + } else { Review comment: use return, we don't need else. ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java ########## @@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) { } op.callback.complete(null); } else { - LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError()); handleTransactionFailOp(response.getError(), response.getMessage(), op); + if (response.getError() == ServerError.TransactionCoordinatorNotFound) { + if (LOG.isDebugEnabled()) { + LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again", + response.getRequestId()); + } + timer.newTimeout(timeout -> { + long requestId = client.newRequestId(); + TxnID txnID = op.txnID; + TxnAction action = op.action; + BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), + txnID.getMostSigBits(), action); + OpForEndTxnCallBack opNew = OpForEndTxnCallBack + .create(Commands.serializeWithSize(cmd), op.callback, txnID, action); + op.cmd.release(); + op.recycle(); + tryExecuteCommandAgain(opNew, requestId); + }, op.backoff.next(), TimeUnit.MILLISECONDS); + return; + } else { + LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError()); + } } - onResponse(op); } + private <T> void tryExecuteCommandAgain(OpBase<T> op, long requestId) { + if (cnx() == null) { + timer.newTimeout(timeout -> + tryExecuteCommandAgain(op, requestId), op.backoff.next(), TimeUnit.MILLISECONDS); + return; + } + pendingRequests.put(requestId, op); + timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); + cnx().ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise()); + } + private void handleTransactionFailOp(ServerError error, String message, OpBase<?> op) { if (error == ServerError.TransactionCoordinatorNotFound && getState() != State.Connecting) { connectionHandler.reconnectLater(new TransactionCoordinatorClientException .CoordinatorNotFoundException(message)); } - if (op != null) { + if (op != null && error != ServerError.TransactionCoordinatorNotFound) { Review comment: dont change this ########## File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java ########## @@ -228,14 +198,14 @@ public void start() throws Exception { Awaitility.await().until(() -> { Review comment: now we don't need to use this logical. The same as reconnect ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java ########## @@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) { } op.callback.complete(null); } else { - LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError()); handleTransactionFailOp(response.getError(), response.getMessage(), op); + if (response.getError() == ServerError.TransactionCoordinatorNotFound) { + if (LOG.isDebugEnabled()) { + LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again", + response.getRequestId()); + } + timer.newTimeout(timeout -> { + long requestId = client.newRequestId(); + TxnID txnID = op.txnID; + TxnAction action = op.action; + BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), Review comment: should cmd should retain, otherwise it will be release in `cnx().ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise());` ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java ########## @@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) { } op.callback.complete(null); } else { - LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError()); handleTransactionFailOp(response.getError(), response.getMessage(), op); + if (response.getError() == ServerError.TransactionCoordinatorNotFound) { + if (LOG.isDebugEnabled()) { + LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again", + response.getRequestId()); + } + timer.newTimeout(timeout -> { + long requestId = client.newRequestId(); + TxnID txnID = op.txnID; + TxnAction action = op.action; + BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), + txnID.getMostSigBits(), action); + OpForEndTxnCallBack opNew = OpForEndTxnCallBack + .create(Commands.serializeWithSize(cmd), op.callback, txnID, action); + op.cmd.release(); + op.recycle(); + tryExecuteCommandAgain(opNew, requestId); + }, op.backoff.next(), TimeUnit.MILLISECONDS); + return; + } else { + LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError()); + } } - onResponse(op); } + private <T> void tryExecuteCommandAgain(OpBase<T> op, long requestId) { + if (cnx() == null) { + timer.newTimeout(timeout -> + tryExecuteCommandAgain(op, requestId), op.backoff.next(), TimeUnit.MILLISECONDS); + return; + } + pendingRequests.put(requestId, op); + timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); + cnx().ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise()); + } + private void handleTransactionFailOp(ServerError error, String message, OpBase<?> op) { if (error == ServerError.TransactionCoordinatorNotFound && getState() != State.Connecting) { connectionHandler.reconnectLater(new TransactionCoordinatorClientException .CoordinatorNotFoundException(message)); } - if (op != null) { + if (op != null && error != ServerError.TransactionCoordinatorNotFound) { op.callback.completeExceptionally(getExceptionByServerError(error, message)); } } private static abstract class OpBase<T> { protected ByteBuf cmd; protected CompletableFuture<T> callback; + protected Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, 3, TimeUnit.SECONDS, 10, Review comment: client.getConfiguration().getInitialBackoffIntervalNanos() client.getConfiguration().getMaxBackoffIntervalNanos() use this to config backoff -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org