This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
     new 41ff05a4cf9 [improve][doc]Add exception examples for transaction API 
(#427)
41ff05a4cf9 is described below

commit 41ff05a4cf984231a9ebd8e2d4f00229fde83e05
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Feb 21 21:13:15 2023 +0800

    [improve][doc]Add exception examples for transaction API (#427)
    
    * [improve][doc]Add exception examples for transaction API
    
    * remove cumulative ack
---
 docs/txn-use.md | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 54 insertions(+)

diff --git a/docs/txn-use.md b/docs/txn-use.md
index 0ff82cb84ea..cfeb6cc208a 100644
--- a/docs/txn-use.md
+++ b/docs/txn-use.md
@@ -92,3 +92,57 @@ Consumer<byte[]> consumer = pulsarClient
     .subscribe();
 ```
 
+[2] Example of using transactions to ack messages individually
+```java
+// resource prepare
+String sourceTopicName = "persistent://" + NAMESPACE1 + "/sourceTopic";
+String sinkTopicName = "persistent://" + NAMESPACE1 + "/sinkTopic";
+String subName = "shared-subscription";
+String producerName = "txn-message-producer";
+try {
+  @Cleanup
+  Producer<String> sinkProducer = pulsarClient.newProducer(Schema.STRING)
+          .topic(sinkTopicName)
+          .producerName(producerName)
+          .create();
+  @Cleanup
+  Consumer<String> sourceConsumer = pulsarClient.newConsumer(Schema.STRING)
+          .topic(sourceTopicName)
+          .subscriptionName(subName)
+          .subscriptionType(SubscriptionType.Shared)
+          .subscribe();
+  Message<String> message = null;
+  Transaction transaction = null;
+  while (true) {
+      try {
+          message = sourceConsumer.receive();
+          //Open a transaction to handle the received message
+          transaction = pulsarClient.newTransaction()
+                  .withTransactionTimeout(5, TimeUnit.SECONDS)
+                  .build()
+                  .get();
+          //Do some things there
+          //Send message to another topic
+          sinkProducer.newMessage(transaction)
+                  .value("handle message " + message.getValue())
+                  .send();
+          //Ack the message that has been consumed
+          sourceConsumer.acknowledgeAsync(message.getMessageId(), 
transaction).get();
+          //Commit the transaction
+          transaction.commit().get();
+      } catch (ExecutionException e) {
+          Throwable exception = e.getCause();
+          if (!(exception instanceof 
PulsarClientException.TransactionConflictException)) {
+              //The message may not be handled, so we need to redeliver it
+              sourceConsumer.negativeAcknowledge(message);
+          }
+          if (!(exception instanceof 
TransactionCoordinatorClientException.TransactionNotFoundException) && 
transaction !=null) {
+              //Abort the transaction if there is an exception and the 
transaction is not end.
+              transaction.abort().get();
+          }
+      }
+  }
+} catch (Exception e) {
+  log.error("Catch Exception", e);
+}
+```

Reply via email to