codelipenghui commented on a change in pull request #14894:
URL: https://github.com/apache/pulsar/pull/14894#discussion_r838543270
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -87,7 +96,18 @@ public TransactionBufferHandlerImpl(PulsarClient
pulsarClient,
long requestId = requestIdGenerator.getAndIncrement();
ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits,
txnIdMostBits,
topic, action, lowWaterMark);
- return endTxn(requestId, topic, cmd, cb);
+
+ try {
Review comment:
lookupCache.get(topic) will throw ExecutionException which means some
errors happened in the lookup cache, we need to catch the exception and
invalidate the cache.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -102,79 +122,101 @@ public TransactionBufferHandlerImpl(PulsarClient
pulsarClient,
long requestId = requestIdGenerator.getAndIncrement();
ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId,
txnIdLeastBits, txnIdMostBits,
topic, subscription, action, lowWaterMark);
- return endTxn(requestId, topic, cmd, cb);
- }
-
- private CompletableFuture<TxnID> endTxn(long requestId, String topic,
ByteBuf cmd, CompletableFuture<TxnID> cb) {
- OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb);
try {
- cache.get(topic).whenComplete((clientCnx, throwable) -> {
- if (throwable == null) {
- if (clientCnx.ctx().channel().isActive()) {
-
clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
- pendingRequests.put(requestId, op);
- timer.newTimeout(timeout -> {
- OpRequestSend peek =
pendingRequests.remove(requestId);
- if (peek != null && !peek.cb.isDone() &&
!peek.cb.isCompletedExceptionally()) {
- peek.cb.completeExceptionally(new
TransactionBufferClientException
- .RequestTimeoutException());
- onResponse(peek);
- }
- }, operationTimeoutInMills, TimeUnit.MILLISECONDS);
- cmd.retain();
- clientCnx.ctx().writeAndFlush(cmd,
clientCnx.ctx().voidPromise());
- } else {
- cache.invalidate(topic);
- cb.completeExceptionally(
- new
PulsarClientException.LookupException(topic + " endTxn channel is not active"));
- op.recycle();
- }
- } else {
- log.error("endTxn error topic: [{}]", topic, throwable);
- cache.invalidate(topic);
- cb.completeExceptionally(
- new
PulsarClientException.LookupException(throwable.getMessage()));
- op.recycle();
- }
- });
+ OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb,
lookupCache.get(topic));
+ if (checkRequestCredits(op)) {
+ endTxn(op);
+ }
} catch (ExecutionException e) {
- log.error("endTxn channel is not active exception", e);
- cache.invalidate(topic);
+ log.error("[{}] failed to get client cnx from lookup cache",
topic, e);
+ lookupCache.invalidate(topic);
cb.completeExceptionally(new
PulsarClientException.LookupException(e.getCause().getMessage()));
- op.recycle();
}
return cb;
}
+ private boolean checkRequestCredits(OpRequestSend op) {
+ int currentPermits = REQUEST_CREDITS_UPDATER.get(this);
+ if (currentPermits > 0 && pendingRequests.peek() == null) {
+ if (REQUEST_CREDITS_UPDATER.compareAndSet(this, currentPermits,
currentPermits - 1)) {
+ return true;
+ } else {
+ return checkRequestCredits(op);
+ }
+ } else {
+ pendingRequests.add(op);
+ return false;
+ }
+ }
+
+ public void endTxn(OpRequestSend op) {
+ op.cnx.whenComplete((clientCnx, throwable) -> {
+ if (throwable == null) {
+ if (clientCnx.ctx().channel().isActive()) {
+
clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
+ outstandingRequests.put(op.requestId, op);
+ timer.newTimeout(timeout -> {
+ OpRequestSend peek =
outstandingRequests.remove(op.requestId);
+ if (peek != null && !peek.cb.isDone() &&
!peek.cb.isCompletedExceptionally()) {
+ peek.cb.completeExceptionally(new
TransactionBufferClientException
+ .RequestTimeoutException());
+ onResponse(peek);
+ }
+ }, operationTimeoutInMills, TimeUnit.MILLISECONDS);
+ op.cmd.retain();
+ clientCnx.ctx().writeAndFlush(op.cmd,
clientCnx.ctx().voidPromise());
+ } else {
+ invalidateLookupCache(op);
+ op.cb.completeExceptionally(
+ new PulsarClientException.LookupException(op.topic
+ " endTxn channel is not active"));
+ onResponse(op);
+ }
+ } else {
+ log.error("endTxn error topic: [{}]", op.topic, throwable);
+ invalidateLookupCache(op);
+ op.cb.completeExceptionally(
+ new
PulsarClientException.LookupException(throwable.getMessage()));
+ onResponse(op);
+ }
+ });
+ }
+
@Override
public void handleEndTxnOnTopicResponse(long requestId,
CommandEndTxnOnPartitionResponse response) {
- OpRequestSend op = pendingRequests.remove(requestId);
+ OpRequestSend op = outstandingRequests.remove(requestId);
if (op == null) {
if (log.isDebugEnabled()) {
log.debug("Got end txn on topic response for timeout {} - {}",
response.getTxnidMostBits(),
response.getTxnidLeastBits());
}
return;
}
-
- if (!response.hasError()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Got end txn on topic response for for request
{}", op.topic, response.getRequestId());
+ try {
+ if (!response.hasError()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Got end txn on topic response for for
request {}", op.topic,
+ response.getRequestId());
+ }
+ op.cb.complete(new TxnID(response.getTxnidMostBits(),
response.getTxnidLeastBits()));
+ } else {
+ log.error("[{}] Got end txn on topic response for request {}
error {}", op.topic,
+ response.getRequestId(),
+ response.getError());
+ invalidateLookupCache(op);
+
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
+ response.getMessage()));
}
- op.cb.complete(new TxnID(response.getTxnidMostBits(),
response.getTxnidLeastBits()));
- } else {
- log.error("[{}] Got end txn on topic response for request {} error
{}", op.topic, response.getRequestId(),
- response.getError());
- cache.invalidate(op.topic);
-
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
response.getMessage()));
+ } catch (Exception e) {
Review comment:
Just want to add an error log if there are some exceptions when
completing the callback, the final block ensure the onResponse() can be
complete.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -183,41 +225,98 @@ public void handleEndTxnOnSubscriptionResponse(long
requestId,
return;
}
- if (!response.hasError()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Got end txn on subscription response for for
request {}",
- op.topic, response.getRequestId());
+ try {
+ if (!response.hasError()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Got end txn on subscription response for
for request {}",
+ op.topic, response.getRequestId());
+ }
+ op.cb.complete(new TxnID(response.getTxnidMostBits(),
response.getTxnidLeastBits()));
+ } else {
+ log.error("[{}] Got end txn on subscription response for
request {} error {}",
+ op.topic, response.getRequestId(),
response.getError());
+ invalidateLookupCache(op);
+
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
+ response.getMessage()));
}
- op.cb.complete(new TxnID(response.getTxnidMostBits(),
response.getTxnidLeastBits()));
- } else {
- log.error("[{}] Got end txn on subscription response for request
{} error {}",
- op.topic, response.getRequestId(), response.getError());
- cache.invalidate(op.topic);
-
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
response.getMessage()));
+ } catch (Exception e) {
+ log.error("[{}] Got exception when complete EndTxnOnSub op for
request {}", op.topic, e);
+ } finally {
+ onResponse(op);
}
- onResponse(op);
}
- void onResponse(OpRequestSend op) {
- ReferenceCountUtil.safeRelease(op.byteBuf);
- op.recycle();
+ public void onResponse(OpRequestSend op) {
+ REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+ if (op != null) {
+ ReferenceCountUtil.safeRelease(op.cmd);
+ op.recycle();
+ }
+ checkPendingRequests();
}
- private static final class OpRequestSend {
+ private void checkPendingRequests() {
+ while (true) {
+ int permits = REQUEST_CREDITS_UPDATER.get(this);
+ if (permits > 0 && pendingRequests.peek() != null) {
+ if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits,
permits - 1)) {
+ OpRequestSend polled = pendingRequests.poll();
+ if (polled != null) {
+ try {
+ if (polled.cnx != lookupCache.get(polled.topic)) {
+ OpRequestSend invalid = polled;
+ polled =
OpRequestSend.create(invalid.requestId, invalid.topic, invalid.cmd, invalid.cb,
+ lookupCache.get(invalid.topic));
+ invalid.recycle();
+ }
+ endTxn(polled);
+ } catch (ExecutionException e) {
+ log.error("[{}] failed to get client cnx from
lookup cache", polled.topic, e);
+ lookupCache.invalidate(polled.topic);
+ polled.cb.completeExceptionally(new
PulsarClientException.LookupException(
+ e.getCause().getMessage()));
+ REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+ }
+ } else {
+ REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+ }
+ } else {
+ checkPendingRequests();
Review comment:
Nice catch!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]