congbobo184 opened a new issue #13918:
URL: https://github.com/apache/pulsar/issues/13918


   Authors: @congbobo184 @codelipenghui 
   # Background
   Now more and more people and companies are testing and using transactions. 
However, the definition of transaction API is not clear. Users don't know how 
to try to catch Exceptions and what time do the transactions can commit or 
abort.
   # Motivation
   Let's take a look at the transaction current use of API.
   ```
   Message<String> message = consumer1.receive();
   try {
       Transaction transaction = 
client.newTransaction().withTransactionTimeout(10, 
TimeUnit.SECONDS).build().get();
   
       consumer1.acknowledgeAsync(message.getMessageId(), transaction);
   
       producer.newMessage(transaction).sendAsync(message.getValue());
   
       transaction.commit().get();
   
   } catch (Exception ignored) {
   
   }
   ```
   This is transaction commit internal code.
   ```
       public CompletableFuture<Void> commit() {
           return checkIfOpenOrCommitting().thenCompose((value) -> {
               CompletableFuture<Void> commitFuture = new CompletableFuture<>();
               this.state = State.COMMITTING;
               allOpComplete().whenComplete((v, e) -> {
                   if (e != null) {
                       abort().whenComplete((vx, ex) -> 
commitFuture.completeExceptionally(e));
                   } else {
                       tcClient.commitAsync(new TxnID(txnIdMostBits, 
txnIdLeastBits))
                               .whenComplete((vx, ex) -> {
                                   if (ex != null) {
                                       if (ex instanceof 
TransactionNotFoundException
                                               || ex instanceof 
InvalidTxnStatusException) {
                                           this.state = State.ERROR;
                                       }
                                       commitFuture.completeExceptionally(ex);
                                   } else {
                                       this.state = State.COMMITTED;
                                       commitFuture.complete(vx);
                                   }
                               });
                   }
               });
               return commitFuture;
           });
       }
   ```
   It has the following problems :
   **problem 1:** There are two exceptions in the code shown above when new 
transaction and end transaction. But we don't know what exception that 
operation will throw, if we new transaction throw exception, we should new 
transaction again. If we commit transaction throw exception, we should commit 
again. 
   
   **problem 2:** As we can see transaction commit internal code. When do 
transaction synchronization operation completed (ack with transaction and 
produce with transaction) then can commit this transaction. Although ACK and 
produce throw exceptions, we can re ACK and produce exceptions such as throw 
timeoutException. It will not affect the excetly-once semantics of the 
transaction. Because produce transaction message we can use Message 
deduplication and the exception thrown by ACK message is not 
Transactionconflictexception we can ack again.
   
   **problem 3:** When transaction op fails, If you commit transaction, client 
will call abort. Such behavior will make users confused. Why do I call commit 
and client abort this transaction directly.
   # Approach
   ## solve problem 1 :Handling exceptions of new transaction and commit 
transaction
   new transaction : The only thing that can be determined here is that we are 
sure to throw an exception when the pulsar client is closed or the broker does 
not support transaction.
   commit or abort transaction : When client closed and broker disable 
transaction or TransactionNotFoundException or InvalidTxnStatusException, user 
need to handle, and any another exception we can retry.
   ### All exceptions are thrown
     Pulsar client don't need to handle any exceptions, throw all exceptions. 
   ### Client handle the exception and retry create transaction
     Because the client is not closed and the broker enable the transaction, we 
should not create transaction fail. The advantage of this is that no retryable 
exception will be thrown during the create transaction. The disadvantage is 
that users are not flexible enough to handle exceptions
   ### Summary
     When client closed and broker disable transaction or 
TransactionNotFoundException or InvalidTxnStatusException, in this situation we 
can throw Exception to user, and user can handle this transaction. Users don't 
need to handle any other exceptions, because they don't care about other 
exceptions and have nothing to do with the user's use. Therefore, if we 
encounter other exceptions, we should automatically retry on the client side, 
which can greatly simplify the exception handling required by users using 
transaction
   ## solve problem 2 : Handle produce and ack exception
    The exceptions that cannot be retried at all are client close or broker not 
opening transaction, and AckConflicException.
   ### Exception wrap
   We wrap all retryable exceptions and let the client make the decision. like 
throw new TransactionOpFailException(e)
   #### advantage
   the user only needs to handle one exception. when do transaction op throw 
this exception, we can abort this transaction 
   #### disadvantage
   users are not flexible enough to handle real exceptions, like produce or ack 
timeout we can try again In some specific context.
    ### Don't wrap
   #### advantage
   The user can do different op when throw different exception
   #### disadvantage
   The user may need to handle multiple exceptions and do different processing, 
which increases the complexity, but they can also abort by throwing exceptions.
   ### Summary
     I think we should not wrap Exception, because in some special scenarios, a 
lot of messages may be sent, but a timeout in the middle will cause the 
transaction to be aborted, and the user does not have the change to choose to 
retry, which may lead to inflexibility and the user experience is not very good.
   ## solve problem 3 : Don't abort when user invoke commit
     If we abort transaction when user invoke commit transaction, it will make 
users very confused.
   ### API Changes
   User use API changed: 
   #### Before
   ```
           consumer.receiveAsync().thenAccept(message -> {
               try {
                   pulsarClient.newTransaction().withTransactionTimeout(10, 
TimeUnit.SECONDS).build().thenAccept(txn -> {
                       producer.newMessage(txn).sendAsync();
                       consumer.acknowledgeAsync(message.getMessageId());
                       txn.commit();
                   });
               } catch (PulsarClientException pulsarClientException) {
                   pulsarClientException.printStackTrace();
               }
           });
   ```
   users need to handle exception and don't know what time need to abort. 
   users can't handle exception when produce or ack message, because commit 
check  all op(produce or ack) process success
   users need catch pulsarClientException when use asynchronous API, because 
pulsarClient.newTransaction check the client if enable transaction
   #### After
   ```
           consumer.receiveAsync().thenAccept(message -> {
               pulsarClient.newTransaction().withTransactionTimeout(10, 
TimeUnit.SECONDS).build().thenAccept(txn -> {
                   List<CompletableFuture<?>> list = new ArrayList<>();
                   list.add(producer.newMessage(txn).sendAsync());
                   list.add(consumer.acknowledgeAsync(message.getMessageId()));
                   FutureUtil.waitForAll(list).thenAccept(v -> {
                       txn.commit().thenAccept(f -> {
                           log.info("Transaction commit success! txnId : {}", 
txn.getTxnID());
                       }).exceptionally(e -> {
                           if (e.getCause() instanceof 
TransactionNotFoundException) {
                               log.error("Transaction have been end! txnId : 
{}", txn.getTxnID(), e.getCause());
                           } else if (e.getCause() instanceof 
TransactionCoordinatorClientException.InvalidTxnStatusException) {
                               log.info("transaction is ending! txnId : {}, 
txnStatus : {}", txn.getTxnID(), ((TransactionImpl) txn).getState());
                           } else {
                               log.error("Client close or client disable 
transaction or broker disable transaction", e.getCause());
                           }
                           return null;
                       });
                   }).exceptionally(e -> {
                       txn.abort();
                       return null;
                   });
               }).exceptionally(e -> {
                   log.error("Client close or client disable transaction or 
broker disable transaction", e.getCause());
                   return null;
               });
           });
   ```
   As we can see, dealing with futures by users will make the process very 
complicated. If user don't handle transaction exception by themself when they 
do transaction op throw exception. We can provide a TransactionUtil to handle 
this situation.
   ```
   public class TransactionUtil {
   
       public static CompletableFuture<Void> prepareCommit(Transaction 
transaction) {
           if (!(transaction instanceof TransactionImpl)) {
               return FutureUtil.failedFuture(new 
IllegalArgumentException("Only support transactionImpl!"));
           }
           return ((TransactionImpl) transaction).prepareCommit();
       }
   
   }
   ```
   If we use transactionUtil, the code is:
   ```
           consumer.receiveAsync().thenAccept(message -> {
               pulsarClient.newTransaction().withTransactionTimeout(10, 
TimeUnit.SECONDS).build().thenAccept(txn -> {
                   producer.newMessage(txn).sendAsync();
                   consumer.acknowledgeAsync(message.getMessageId());
                   TransactionUtil.prepareCommit(txn).thenAccept(v -> {
                       txn.commit().thenAccept(f -> {
                           log.info("Transaction commit success! txnId : {}", 
txn.getTxnID());
                       }).exceptionally(e -> {
                           if (e.getCause() instanceof 
TransactionNotFoundException) {
                               log.error("Transaction have been end! txnId : 
{}", txn.getTxnID(), e.getCause());
                           } else if (e.getCause() instanceof 
TransactionCoordinatorClientException.InvalidTxnStatusException) {
                               log.info("transaction is ending! txnId : {}, 
txnStatus : {}", txn.getTxnID(), ((TransactionImpl) txn).getState());
                           } else {
                               log.error("Client close or client disable 
transaction or broker disable transaction", e.getCause());
                           }
                           return null;
                       });
                   }).exceptionally(e -> {
                       txn.abort();
                       return null;
                   });
               }).exceptionally(e -> {
                   log.error("Client close or client disable transaction or 
broker disable transaction", e.getCause());
                   return null;
               });
           });
   ```
   If we use synchronous method, code like: 
   ```
           Message<byte[]> message = consumer.receive();
           Transaction txn = null;
           try {
               txn  = pulsarClient.newTransaction().withTransactionTimeout(10, 
TimeUnit.SECONDS).build().get();
               producer.newMessage(txn).sendAsync();
               consumer.acknowledgeAsync(message.getMessageId());
               try {
                   TransactionUtil.prepareCommit(txn).get();
               } catch (InterruptedException | ExecutionException e) {
                   txn.abort();
                   return;
               }
               txn.commit().get();
           } catch (InterruptedException | ExecutionException e) {
               if (txn != null) {
                   if (e.getCause() instanceof TransactionNotFoundException) {
                       log.error("Transaction have been end! txnId : {}", 
txn.getTxnID(), e.getCause());
                   } else if (e.getCause() instanceof 
TransactionCoordinatorClientException.InvalidTxnStatusException) {
                       log.info("transaction is ending! txnId : {}, txnStatus : 
{}", txn.getTxnID(), ((TransactionImpl) txn).getState());
                   } else {
                       log.error("Client close or client disable transaction or 
broker disable transaction", e.getCause());
                   }  
               } else  {
                   log.error("Client close or client disable transaction or 
broker disable transaction", e.getCause());
               }
           }
   ```
   
   # Documentation Changes [Optional]
   https://pulsar.incubator.apache.org/docs/en/txn-use/ should change use 
example code.
   # Implementation
   end  and new transaction retry
   When client closed and broker disable transaction or 
TransactionNotFoundException or InvalidTxnStatusException, we can throw. any 
other Exception we should retry.
   invoke commit will not abort
   if invoke commit, we only commit.
   add TransactionUtil to Simplify user steps
   ```
   public class TransactionUtil {
   
       public static CompletableFuture<Void> prepareCommit(Transaction 
transaction) {
           if (!(transaction instanceof TransactionImpl)) {
               return FutureUtil.failedFuture(new 
IllegalArgumentException("Only support transactionImpl!"));
           }
           return ((TransactionImpl) transaction).prepareCommit();
       }
   }
   ```
   # Compatibility
   user invoke commit will not abort any transaction.
   user need to handle transaction op(produce or ack) by themself.
   # Test Plan [Optional]
   new and end transaction retry test
   test commit will not check transaction op


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to