This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git
The following commit(s) were added to refs/heads/main by this push:
new 41ff05a4cf9 [improve][doc]Add exception examples for transaction API
(#427)
41ff05a4cf9 is described below
commit 41ff05a4cf984231a9ebd8e2d4f00229fde83e05
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Feb 21 21:13:15 2023 +0800
[improve][doc]Add exception examples for transaction API (#427)
* [improve][doc]Add exception examples for transaction API
* remove cumulative ack
---
docs/txn-use.md | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 54 insertions(+)
diff --git a/docs/txn-use.md b/docs/txn-use.md
index 0ff82cb84ea..cfeb6cc208a 100644
--- a/docs/txn-use.md
+++ b/docs/txn-use.md
@@ -92,3 +92,57 @@ Consumer<byte[]> consumer = pulsarClient
.subscribe();
```
+[2] Example of using transactions to ack messages individually
+```java
+// resource prepare
+String sourceTopicName = "persistent://" + NAMESPACE1 + "/sourceTopic";
+String sinkTopicName = "persistent://" + NAMESPACE1 + "/sinkTopic";
+String subName = "shared-subscription";
+String producerName = "txn-message-producer";
+try {
+ @Cleanup
+ Producer<String> sinkProducer = pulsarClient.newProducer(Schema.STRING)
+ .topic(sinkTopicName)
+ .producerName(producerName)
+ .create();
+ @Cleanup
+ Consumer<String> sourceConsumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(sourceTopicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+ Message<String> message = null;
+ Transaction transaction = null;
+ while (true) {
+ try {
+ message = sourceConsumer.receive();
+ //Open a transaction to handle the received message
+ transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+ //Do some things there
+ //Send message to another topic
+ sinkProducer.newMessage(transaction)
+ .value("handle message " + message.getValue())
+ .send();
+ //Ack the message that has been consumed
+ sourceConsumer.acknowledgeAsync(message.getMessageId(),
transaction).get();
+ //Commit the transaction
+ transaction.commit().get();
+ } catch (ExecutionException e) {
+ Throwable exception = e.getCause();
+ if (!(exception instanceof
PulsarClientException.TransactionConflictException)) {
+ //The message may not be handled, so we need to redeliver it
+ sourceConsumer.negativeAcknowledge(message);
+ }
+ if (!(exception instanceof
TransactionCoordinatorClientException.TransactionNotFoundException) &&
transaction !=null) {
+ //Abort the transaction if there is an exception and the
transaction is not end.
+ transaction.abort().get();
+ }
+ }
+ }
+} catch (Exception e) {
+ log.error("Catch Exception", e);
+}
+```