congbobo184 opened a new issue #13918:
URL: https://github.com/apache/pulsar/issues/13918
Authors: @congbobo184 @codelipenghui
# Background
Now more and more people and companies are testing and using transactions.
However, the definition of transaction API is not clear. Users don't know how
to try to catch Exceptions and what time do the transactions can commit or
abort.
# Motivation
Let's take a look at the transaction current use of API.
```
Message<String> message = consumer1.receive();
try {
Transaction transaction =
client.newTransaction().withTransactionTimeout(10,
TimeUnit.SECONDS).build().get();
consumer1.acknowledgeAsync(message.getMessageId(), transaction);
producer.newMessage(transaction).sendAsync(message.getValue());
transaction.commit().get();
} catch (Exception ignored) {
}
```
This is transaction commit internal code.
```
public CompletableFuture<Void> commit() {
return checkIfOpenOrCommitting().thenCompose((value) -> {
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
this.state = State.COMMITTING;
allOpComplete().whenComplete((v, e) -> {
if (e != null) {
abort().whenComplete((vx, ex) ->
commitFuture.completeExceptionally(e));
} else {
tcClient.commitAsync(new TxnID(txnIdMostBits,
txnIdLeastBits))
.whenComplete((vx, ex) -> {
if (ex != null) {
if (ex instanceof
TransactionNotFoundException
|| ex instanceof
InvalidTxnStatusException) {
this.state = State.ERROR;
}
commitFuture.completeExceptionally(ex);
} else {
this.state = State.COMMITTED;
commitFuture.complete(vx);
}
});
}
});
return commitFuture;
});
}
```
It has the following problems :
**problem 1:** There are two exceptions in the code shown above when new
transaction and end transaction. But we don't know what exception that
operation will throw, if we new transaction throw exception, we should new
transaction again. If we commit transaction throw exception, we should commit
again.
**problem 2:** As we can see transaction commit internal code. When do
transaction synchronization operation completed (ack with transaction and
produce with transaction) then can commit this transaction. Although ACK and
produce throw exceptions, we can re ACK and produce exceptions such as throw
timeoutException. It will not affect the excetly-once semantics of the
transaction. Because produce transaction message we can use Message
deduplication and the exception thrown by ACK message is not
Transactionconflictexception we can ack again.
**problem 3:** When transaction op fails, If you commit transaction, client
will call abort. Such behavior will make users confused. Why do I call commit
and client abort this transaction directly.
# Approach
## solve problem 1 :Handling exceptions of new transaction and commit
transaction
new transaction : The only thing that can be determined here is that we are
sure to throw an exception when the pulsar client is closed or the broker does
not support transaction.
commit or abort transaction : When client closed and broker disable
transaction or TransactionNotFoundException or InvalidTxnStatusException, user
need to handle, and any another exception we can retry.
### All exceptions are thrown
Pulsar client don't need to handle any exceptions, throw all exceptions.
### Client handle the exception and retry create transaction
Because the client is not closed and the broker enable the transaction, we
should not create transaction fail. The advantage of this is that no retryable
exception will be thrown during the create transaction. The disadvantage is
that users are not flexible enough to handle exceptions
### Summary
When client closed and broker disable transaction or
TransactionNotFoundException or InvalidTxnStatusException, in this situation we
can throw Exception to user, and user can handle this transaction. Users don't
need to handle any other exceptions, because they don't care about other
exceptions and have nothing to do with the user's use. Therefore, if we
encounter other exceptions, we should automatically retry on the client side,
which can greatly simplify the exception handling required by users using
transaction
## solve problem 2 : Handle produce and ack exception
The exceptions that cannot be retried at all are client close or broker not
opening transaction, and AckConflicException.
### Exception wrap
We wrap all retryable exceptions and let the client make the decision. like
throw new TransactionOpFailException(e)
#### advantage
the user only needs to handle one exception. when do transaction op throw
this exception, we can abort this transaction
#### disadvantage
users are not flexible enough to handle real exceptions, like produce or ack
timeout we can try again In some specific context.
### Don't wrap
#### advantage
The user can do different op when throw different exception
#### disadvantage
The user may need to handle multiple exceptions and do different processing,
which increases the complexity, but they can also abort by throwing exceptions.
### Summary
I think we should not wrap Exception, because in some special scenarios, a
lot of messages may be sent, but a timeout in the middle will cause the
transaction to be aborted, and the user does not have the change to choose to
retry, which may lead to inflexibility and the user experience is not very good.
## solve problem 3 : Don't abort when user invoke commit
If we abort transaction when user invoke commit transaction, it will make
users very confused.
### API Changes
User use API changed:
#### Before
```
consumer.receiveAsync().thenAccept(message -> {
try {
pulsarClient.newTransaction().withTransactionTimeout(10,
TimeUnit.SECONDS).build().thenAccept(txn -> {
producer.newMessage(txn).sendAsync();
consumer.acknowledgeAsync(message.getMessageId());
txn.commit();
});
} catch (PulsarClientException pulsarClientException) {
pulsarClientException.printStackTrace();
}
});
```
users need to handle exception and don't know what time need to abort.
users can't handle exception when produce or ack message, because commit
check all op(produce or ack) process success
users need catch pulsarClientException when use asynchronous API, because
pulsarClient.newTransaction check the client if enable transaction
#### After
```
consumer.receiveAsync().thenAccept(message -> {
pulsarClient.newTransaction().withTransactionTimeout(10,
TimeUnit.SECONDS).build().thenAccept(txn -> {
List<CompletableFuture<?>> list = new ArrayList<>();
list.add(producer.newMessage(txn).sendAsync());
list.add(consumer.acknowledgeAsync(message.getMessageId()));
FutureUtil.waitForAll(list).thenAccept(v -> {
txn.commit().thenAccept(f -> {
log.info("Transaction commit success! txnId : {}",
txn.getTxnID());
}).exceptionally(e -> {
if (e.getCause() instanceof
TransactionNotFoundException) {
log.error("Transaction have been end! txnId :
{}", txn.getTxnID(), e.getCause());
} else if (e.getCause() instanceof
TransactionCoordinatorClientException.InvalidTxnStatusException) {
log.info("transaction is ending! txnId : {},
txnStatus : {}", txn.getTxnID(), ((TransactionImpl) txn).getState());
} else {
log.error("Client close or client disable
transaction or broker disable transaction", e.getCause());
}
return null;
});
}).exceptionally(e -> {
txn.abort();
return null;
});
}).exceptionally(e -> {
log.error("Client close or client disable transaction or
broker disable transaction", e.getCause());
return null;
});
});
```
As we can see, dealing with futures by users will make the process very
complicated. If user don't handle transaction exception by themself when they
do transaction op throw exception. We can provide a TransactionUtil to handle
this situation.
```
public class TransactionUtil {
public static CompletableFuture<Void> prepareCommit(Transaction
transaction) {
if (!(transaction instanceof TransactionImpl)) {
return FutureUtil.failedFuture(new
IllegalArgumentException("Only support transactionImpl!"));
}
return ((TransactionImpl) transaction).prepareCommit();
}
}
```
If we use transactionUtil, the code is:
```
consumer.receiveAsync().thenAccept(message -> {
pulsarClient.newTransaction().withTransactionTimeout(10,
TimeUnit.SECONDS).build().thenAccept(txn -> {
producer.newMessage(txn).sendAsync();
consumer.acknowledgeAsync(message.getMessageId());
TransactionUtil.prepareCommit(txn).thenAccept(v -> {
txn.commit().thenAccept(f -> {
log.info("Transaction commit success! txnId : {}",
txn.getTxnID());
}).exceptionally(e -> {
if (e.getCause() instanceof
TransactionNotFoundException) {
log.error("Transaction have been end! txnId :
{}", txn.getTxnID(), e.getCause());
} else if (e.getCause() instanceof
TransactionCoordinatorClientException.InvalidTxnStatusException) {
log.info("transaction is ending! txnId : {},
txnStatus : {}", txn.getTxnID(), ((TransactionImpl) txn).getState());
} else {
log.error("Client close or client disable
transaction or broker disable transaction", e.getCause());
}
return null;
});
}).exceptionally(e -> {
txn.abort();
return null;
});
}).exceptionally(e -> {
log.error("Client close or client disable transaction or
broker disable transaction", e.getCause());
return null;
});
});
```
If we use synchronous method, code like:
```
Message<byte[]> message = consumer.receive();
Transaction txn = null;
try {
txn = pulsarClient.newTransaction().withTransactionTimeout(10,
TimeUnit.SECONDS).build().get();
producer.newMessage(txn).sendAsync();
consumer.acknowledgeAsync(message.getMessageId());
try {
TransactionUtil.prepareCommit(txn).get();
} catch (InterruptedException | ExecutionException e) {
txn.abort();
return;
}
txn.commit().get();
} catch (InterruptedException | ExecutionException e) {
if (txn != null) {
if (e.getCause() instanceof TransactionNotFoundException) {
log.error("Transaction have been end! txnId : {}",
txn.getTxnID(), e.getCause());
} else if (e.getCause() instanceof
TransactionCoordinatorClientException.InvalidTxnStatusException) {
log.info("transaction is ending! txnId : {}, txnStatus :
{}", txn.getTxnID(), ((TransactionImpl) txn).getState());
} else {
log.error("Client close or client disable transaction or
broker disable transaction", e.getCause());
}
} else {
log.error("Client close or client disable transaction or
broker disable transaction", e.getCause());
}
}
```
# Documentation Changes [Optional]
https://pulsar.incubator.apache.org/docs/en/txn-use/ should change use
example code.
# Implementation
end and new transaction retry
When client closed and broker disable transaction or
TransactionNotFoundException or InvalidTxnStatusException, we can throw. any
other Exception we should retry.
invoke commit will not abort
if invoke commit, we only commit.
add TransactionUtil to Simplify user steps
```
public class TransactionUtil {
public static CompletableFuture<Void> prepareCommit(Transaction
transaction) {
if (!(transaction instanceof TransactionImpl)) {
return FutureUtil.failedFuture(new
IllegalArgumentException("Only support transactionImpl!"));
}
return ((TransactionImpl) transaction).prepareCommit();
}
}
```
# Compatibility
user invoke commit will not abort any transaction.
user need to handle transaction op(produce or ack) by themself.
# Test Plan [Optional]
new and end transaction retry test
test commit will not check transaction op
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]