This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
     new b17a5313ce99 [fix][doc] Fix code block format for transaction demo 
(#952)
b17a5313ce99 is described below

commit b17a5313ce99dfaede0adf26cba54b13c27ca48d
Author: Zike Yang <[email protected]>
AuthorDate: Thu Aug 15 22:20:58 2024 +0800

    [fix][doc] Fix code block format for transaction demo (#952)
    
    * Fix code block format for transaction demo
    
    * Push changes for old version
---
 docs/txn-use.md                         | 391 ++++++++++++++++----------------
 versioned_docs/version-3.0.x/txn-use.md | 391 ++++++++++++++++----------------
 versioned_docs/version-3.1.x/txn-use.md | 391 ++++++++++++++++----------------
 versioned_docs/version-3.2.x/txn-use.md | 391 ++++++++++++++++----------------
 versioned_docs/version-3.3.x/txn-use.md | 390 +++++++++++++++----------------
 5 files changed, 980 insertions(+), 974 deletions(-)

diff --git a/docs/txn-use.md b/docs/txn-use.md
index 193a03de4dbb..556d868b6c4c 100644
--- a/docs/txn-use.md
+++ b/docs/txn-use.md
@@ -5,6 +5,11 @@ sidebar_label: "Get started"
 description: Get started to use Pulsar transaction API.
 ---
 
+````mdx-code-block
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+````
+
 Pulsar transaction is primarily a server-side and protocol-level feature. This 
tutorial guides you through every step of how to use the [Pulsar transaction 
API](/api/admin/) to send and receive messages in a Java client.
 
 :::note
@@ -78,8 +83,6 @@ To use Pulsar transaction API, complete the following steps.
 
     :::
 
-    **Input**
-
 ````mdx-code-block
 <Tabs groupId="api-choice"
   defaultValue="Java"
@@ -87,205 +90,203 @@ To use Pulsar transaction API, complete the following 
steps.
 
 <TabItem value="Java">
 
-    ```java
-    PulsarClient client = PulsarClient.builder()
-                    // Step 3: create a Pulsar client and enable transactions.
-                    .enableTransaction(true)
-                    .serviceUrl(jct.serviceUrl)
-                    .build();
-
-            // Step 4: create three producers to produce messages to input and 
output topics.
-            ProducerBuilder<String> producerBuilder = 
client.newProducer(Schema.STRING);
-            Producer<String> inputProducer = producerBuilder.topic(inputTopic)
-                    .sendTimeout(0, TimeUnit.SECONDS).create();
-            Producer<String> outputProducerOne = 
producerBuilder.topic(outputTopicOne)
-                    .sendTimeout(0, TimeUnit.SECONDS).create();
-            Producer<String> outputProducerTwo = 
producerBuilder.topic(outputTopicTwo)
-                    .sendTimeout(0, TimeUnit.SECONDS).create();
-            // Step 4: create three consumers to consume messages from input 
and output topics.
-            Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
-                    
.subscriptionName("your-subscription-name").topic(inputTopic).subscribe();
-            Consumer<String> outputConsumerOne = 
client.newConsumer(Schema.STRING)
-                    
.subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe();
-            Consumer<String> outputConsumerTwo = 
client.newConsumer(Schema.STRING)
-                    
.subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe();
-
-            int count = 2;
-            // Step 5: produce messages to input topics.
-            for (int i = 0; i < count; i++) {
-                inputProducer.send("Hello Pulsar! count : " + i);
-            }
-
-            // Step 5: consume messages and produce them to output topics with 
transactions.
-            for (int i = 0; i < count; i++) {
-
-                // Step 5: the consumer successfully receives messages.
-                Message<String> message = inputConsumer.receive();
-
-                // Step 6: create transactions.
-                // The transaction timeout is specified as 10 seconds.
-                // If the transaction is not committed within 10 seconds, the 
transaction is automatically aborted.
-                Transaction txn = null;
-                try {
-                    txn = client.newTransaction()
-                            .withTransactionTimeout(10, 
TimeUnit.SECONDS).build().get();
-                    // Step 6: you can process the received message with your 
use case and business logic.
-
-                    // Step 7: the producers produce messages to output topics 
with transactions
-                    outputProducerOne.newMessage(txn).value("Hello Pulsar! 
outputTopicOne count : " + i).send();
-                    outputProducerTwo.newMessage(txn).value("Hello Pulsar! 
outputTopicTwo count : " + i).send();
-
-                    // Step 7: the consumers acknowledge the input message 
with the transactions *individually*.
-                    inputConsumer.acknowledgeAsync(message.getMessageId(), 
txn).get();
-                    // Step 8: commit transactions.
-                    txn.commit().get();
-                } catch (ExecutionException e) {
-                    if (!(e.getCause() instanceof 
PulsarClientException.TransactionConflictException)) {
-                        // If TransactionConflictException is not thrown,
-                        // you need to redeliver or negativeAcknowledge this 
message,
-                        // or else this message will not be received again.
-                        inputConsumer.negativeAcknowledge(message);
-                    }
-
-                    // If a new transaction is created,
-                    // then the old transaction should be aborted.
-                    if (txn != null) {
-                        txn.abort();
-                    }
-                }
-            }
-
-            // Final result: consume messages from output topics and print 
them.
-            for (int i = 0; i < count; i++) {
-                Message<String> message =  outputConsumerOne.receive();
-                System.out.println("Receive transaction message: " + 
message.getValue());
-            }
-
-            for (int i = 0; i < count; i++) {
-                Message<String> message =  outputConsumerTwo.receive();
-                System.out.println("Receive transaction message: " + 
message.getValue());
-            }
+```java
+PulsarClient client = PulsarClient.builder()
+                // Step 3: create a Pulsar client and enable transactions.
+                .enableTransaction(true)
+                .serviceUrl(jct.serviceUrl)
+                .build();
+
+// Step 4: create three producers to produce messages to input and output 
topics.
+ProducerBuilder<String> producerBuilder = client.newProducer(Schema.STRING);
+Producer<String> inputProducer = producerBuilder.topic(inputTopic)
+        .sendTimeout(0, TimeUnit.SECONDS).create();
+Producer<String> outputProducerOne = producerBuilder.topic(outputTopicOne)
+        .sendTimeout(0, TimeUnit.SECONDS).create();
+Producer<String> outputProducerTwo = producerBuilder.topic(outputTopicTwo)
+        .sendTimeout(0, TimeUnit.SECONDS).create();
+// Step 4: create three consumers to consume messages from input and output 
topics.
+Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
+        
.subscriptionName("your-subscription-name").topic(inputTopic).subscribe();
+Consumer<String> outputConsumerOne = client.newConsumer(Schema.STRING)
+        
.subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe();
+Consumer<String> outputConsumerTwo = client.newConsumer(Schema.STRING)
+        
.subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe();
+
+int count = 2;
+// Step 5: produce messages to input topics.
+for (int i = 0; i < count; i++) {
+    inputProducer.send("Hello Pulsar! count : " + i);
+}
+
+// Step 5: consume messages and produce them to output topics with 
transactions.
+for (int i = 0; i < count; i++) {
+
+    // Step 5: the consumer successfully receives messages.
+    Message<String> message = inputConsumer.receive();
+
+    // Step 6: create transactions.
+    // The transaction timeout is specified as 10 seconds.
+    // If the transaction is not committed within 10 seconds, the transaction 
is automatically aborted.
+    Transaction txn = null;
+    try {
+        txn = client.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+        // Step 6: you can process the received message with your use case and 
business logic.
+
+        // Step 7: the producers produce messages to output topics with 
transactions
+        outputProducerOne.newMessage(txn).value("Hello Pulsar! outputTopicOne 
count : " + i).send();
+        outputProducerTwo.newMessage(txn).value("Hello Pulsar! outputTopicTwo 
count : " + i).send();
+
+        // Step 7: the consumers acknowledge the input message with the 
transactions *individually*.
+        inputConsumer.acknowledgeAsync(message.getMessageId(), txn).get();
+        // Step 8: commit transactions.
+        txn.commit().get();
+    } catch (ExecutionException e) {
+        if (!(e.getCause() instanceof 
PulsarClientException.TransactionConflictException)) {
+            // If TransactionConflictException is not thrown,
+            // you need to redeliver or negativeAcknowledge this message,
+            // or else this message will not be received again.
+            inputConsumer.negativeAcknowledge(message);
+        }
+
+        // If a new transaction is created,
+        // then the old transaction should be aborted.
+        if (txn != null) {
+            txn.abort();
         }
     }
-    ```
+}
+
+// Final result: consume messages from output topics and print them.
+for (int i = 0; i < count; i++) {
+    Message<String> message =  outputConsumerOne.receive();
+    System.out.println("Receive transaction message: " + message.getValue());
+}
+
+for (int i = 0; i < count; i++) {
+    Message<String> message =  outputConsumerTwo.receive();
+    System.out.println("Receive transaction message: " + message.getValue());
+}
+```
 
 </TabItem>
 <TabItem value="Go">
 
-  ```go
-       // Step 3: create a Pulsar client and enable transactions.
-       client, err := pulsar.NewClient(pulsar.ClientOptions{
-               URL:               "<serviceUrl>",
-               EnableTransaction: true,
-       })
-       if err != nil {
-               log.Fatalf("create client fail, err = %v", err)
-       }
-       defer client.Close()
-       // Step 4: create three producers to produce messages to input and 
output topics.
-       inputTopic := "inputTopic"
-       outputTopicOne := "outputTopicOne"
-       outputTopicTwo := "outputTopicTwo"
-       subscriptionName := "your-subscription-name"
-       inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
-               Topic:       inputTopic,
-               SendTimeout: 0,
-       })
-       defer inputProducer.Close()
-       outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
-               Topic:       outputTopicOne,
-               SendTimeout: 0,
-       })
-       defer outputProducerOne.Close()
-       outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
-               Topic:       outputTopicTwo,
-               SendTimeout: 0,
-       })
-       defer outputProducerTwo.Close()
-
-       // Step 4: create three consumers to consume messages from input and 
output topics.
-       inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            inputTopic,
-               SubscriptionName: subscriptionName,
-       })
-       defer inputConsumer.Close()
-       outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            outputTopicOne,
-               SubscriptionName: subscriptionName,
-       })
-       defer outputConsumerOne.Close()
-       outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            outputTopicTwo,
-               SubscriptionName: subscriptionName,
-       })
-       defer outputConsumerTwo.Close()
-
-       // Step 5: produce messages to input topics.
-       ctx := context.Background()
-       count := 2
-       for i := 0; i < count; i++ {
-               inputProducer.Send(ctx, &pulsar.ProducerMessage{
-                       Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", 
i)),
-               })
-       }
-       // Step 5: consume messages and produce them to output topics with 
transactions.
-       for i := 0; i < count; i++ {
-               // Step 5: the consumer successfully receives messages.
-               message, err := inputConsumer.Receive(ctx)
-               if err != nil {
-                       log.Printf("receive message from %s fail, err = %v", 
inputTopic, err)
-                       continue
-               }
-               // Step 6: create transactions.
-               // The transaction timeout is specified as 10 seconds.
-               // If the transaction is not committed within 10 seconds, the 
transaction is automatically aborted.
-               txn, err := client.NewTransaction(10 * time.Second)
-               if err != nil {
-                       log.Printf("create txn fail, err = %v", err)
-                       continue
-               }
-               // Step 6: you can process the received message with your use 
case and business logic.
-               // processMessage(message)
-               // Step 7: the producers produce messages to output topics with 
transactions
-               _, err = outputProducerOne.Send(context.Background(), 
&pulsar.ProducerMessage{
-                       Transaction: txn,
-                       Payload:     []byte(fmt.Sprintf("Hello Pulsar! 
outputTopicOne count : %d", i)),
-               })
-               if err != nil {
-                       log.Printf("send to producerOne fail %v", err)
-                       txn.Abort(ctx)
-               }
-               _, err = outputProducerTwo.Send(context.Background(), 
&pulsar.ProducerMessage{
-                       Transaction: txn,
-                       Payload:     []byte(fmt.Sprintf("Hello Pulsar! 
outputTopicTwo count : %d", i)),
-               })
-               if err != nil {
-                       log.Printf("send to producerTwo fail %v", err)
-                       txn.Abort(ctx)
-               }
-               // Step 7: the consumers acknowledge the input message with the 
transactions *individually*.
-               err = inputConsumer.AckWithTxn(message, txn)
-               if err != nil {
-                       log.Printf("ack message fail %v", err)
-                       txn.Abort(ctx)
-               }
-               // Step 8: commit transactions.
-               err = txn.Commit(ctx)
-               if err != nil {
-                       log.Printf("commit txn fail %v", err)
-               }
-       }
-
-       // Final result: consume messages from output topics and print them.
-       for i := 0; i < count; i++ {
-               message, _ := outputConsumerOne.Receive(ctx)
-               log.Printf("Receive transaction message: %s", 
string(message.Payload()))
-       }
-       for i := 0; i < count; i++ {
-               message, _ := outputConsumerTwo.Receive(ctx)
-               log.Printf("Receive transaction message: %s", 
string(message.Payload()))
-       }
-  ```
+```go
+// Step 3: create a Pulsar client and enable transactions.
+client, err := pulsar.NewClient(pulsar.ClientOptions{
+  URL:               "<serviceUrl>",
+  EnableTransaction: true,
+})
+if err != nil {
+  log.Fatalf("create client fail, err = %v", err)
+}
+defer client.Close()
+// Step 4: create three producers to produce messages to input and output 
topics.
+inputTopic := "inputTopic"
+outputTopicOne := "outputTopicOne"
+outputTopicTwo := "outputTopicTwo"
+subscriptionName := "your-subscription-name"
+inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
+  Topic:       inputTopic,
+  SendTimeout: 0,
+})
+defer inputProducer.Close()
+outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
+  Topic:       outputTopicOne,
+  SendTimeout: 0,
+})
+defer outputProducerOne.Close()
+outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
+  Topic:       outputTopicTwo,
+  SendTimeout: 0,
+})
+defer outputProducerTwo.Close()
+
+// Step 4: create three consumers to consume messages from input and output 
topics.
+inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
+  Topic:            inputTopic,
+  SubscriptionName: subscriptionName,
+})
+defer inputConsumer.Close()
+outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
+  Topic:            outputTopicOne,
+  SubscriptionName: subscriptionName,
+})
+defer outputConsumerOne.Close()
+outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
+  Topic:            outputTopicTwo,
+  SubscriptionName: subscriptionName,
+})
+defer outputConsumerTwo.Close()
+
+// Step 5: produce messages to input topics.
+ctx := context.Background()
+count := 2
+for i := 0; i < count; i++ {
+  inputProducer.Send(ctx, &pulsar.ProducerMessage{
+    Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", i)),
+  })
+}
+// Step 5: consume messages and produce them to output topics with 
transactions.
+for i := 0; i < count; i++ {
+  // Step 5: the consumer successfully receives messages.
+  message, err := inputConsumer.Receive(ctx)
+  if err != nil {
+    log.Printf("receive message from %s fail, err = %v", inputTopic, err)
+    continue
+  }
+  // Step 6: create transactions.
+  // The transaction timeout is specified as 10 seconds.
+  // If the transaction is not committed within 10 seconds, the transaction is 
automatically aborted.
+  txn, err := client.NewTransaction(10 * time.Second)
+  if err != nil {
+    log.Printf("create txn fail, err = %v", err)
+    continue
+  }
+  // Step 6: you can process the received message with your use case and 
business logic.
+  // processMessage(message)
+  // Step 7: the producers produce messages to output topics with transactions
+  _, err = outputProducerOne.Send(context.Background(), 
&pulsar.ProducerMessage{
+    Transaction: txn,
+    Payload:     []byte(fmt.Sprintf("Hello Pulsar! outputTopicOne count : %d", 
i)),
+  })
+  if err != nil {
+    log.Printf("send to producerOne fail %v", err)
+    txn.Abort(ctx)
+  }
+  _, err = outputProducerTwo.Send(context.Background(), 
&pulsar.ProducerMessage{
+    Transaction: txn,
+    Payload:     []byte(fmt.Sprintf("Hello Pulsar! outputTopicTwo count : %d", 
i)),
+  })
+  if err != nil {
+    log.Printf("send to producerTwo fail %v", err)
+    txn.Abort(ctx)
+  }
+  // Step 7: the consumers acknowledge the input message with the transactions 
*individually*.
+  err = inputConsumer.AckWithTxn(message, txn)
+  if err != nil {
+    log.Printf("ack message fail %v", err)
+    txn.Abort(ctx)
+  }
+  // Step 8: commit transactions.
+  err = txn.Commit(ctx)
+  if err != nil {
+    log.Printf("commit txn fail %v", err)
+  }
+}
+
+// Final result: consume messages from output topics and print them.
+for i := 0; i < count; i++ {
+  message, _ := outputConsumerOne.Receive(ctx)
+  log.Printf("Receive transaction message: %s", string(message.Payload()))
+}
+for i := 0; i < count; i++ {
+  message, _ := outputConsumerTwo.Receive(ctx)
+  log.Printf("Receive transaction message: %s", string(message.Payload()))
+}
+```
 
 </TabItem>
 </Tabs>
diff --git a/versioned_docs/version-3.0.x/txn-use.md 
b/versioned_docs/version-3.0.x/txn-use.md
index 6886ea8a83ca..d48b824a5606 100644
--- a/versioned_docs/version-3.0.x/txn-use.md
+++ b/versioned_docs/version-3.0.x/txn-use.md
@@ -4,6 +4,11 @@ title: Get started
 sidebar_label: "Get started"
 ---
 
+````mdx-code-block
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+````
+
 Pulsar transaction is primarily a server-side and protocol-level feature. This 
tutorial guides you through every step of how to use the [Pulsar transaction 
API](/api/admin/) to send and receive messages in a Java client.
 
 :::note
@@ -73,8 +78,6 @@ Currently, [Pulsar transaction API](/api/admin/) is available 
in **Pulsar 2.8.0
 
     :::
 
-    **Input**
-
 ````mdx-code-block
 <Tabs groupId="api-choice"
   defaultValue="Java"
@@ -82,205 +85,203 @@ Currently, [Pulsar transaction API](/api/admin/) is 
available in **Pulsar 2.8.0
 
 <TabItem value="Java">
 
-    ```java
-    PulsarClient client = PulsarClient.builder()
-                    // Step 3: create a Pulsar client and enable transactions.
-                    .enableTransaction(true)
-                    .serviceUrl(jct.serviceUrl)
-                    .build();
-
-            // Step 4: create three producers to produce messages to input and 
output topics.
-            ProducerBuilder<String> producerBuilder = 
client.newProducer(Schema.STRING);
-            Producer<String> inputProducer = producerBuilder.topic(inputTopic)
-                    .sendTimeout(0, TimeUnit.SECONDS).create();
-            Producer<String> outputProducerOne = 
producerBuilder.topic(outputTopicOne)
-                    .sendTimeout(0, TimeUnit.SECONDS).create();
-            Producer<String> outputProducerTwo = 
producerBuilder.topic(outputTopicTwo)
-                    .sendTimeout(0, TimeUnit.SECONDS).create();
-            // Step 4: create three consumers to consume messages from input 
and output topics.
-            Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
-                    
.subscriptionName("your-subscription-name").topic(inputTopic).subscribe();
-            Consumer<String> outputConsumerOne = 
client.newConsumer(Schema.STRING)
-                    
.subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe();
-            Consumer<String> outputConsumerTwo = 
client.newConsumer(Schema.STRING)
-                    
.subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe();
-
-            int count = 2;
-            // Step 5: produce messages to input topics.
-            for (int i = 0; i < count; i++) {
-                inputProducer.send("Hello Pulsar! count : " + i);
-            }
-
-            // Step 5: consume messages and produce them to output topics with 
transactions.
-            for (int i = 0; i < count; i++) {
-
-                // Step 5: the consumer successfully receives messages.
-                Message<String> message = inputConsumer.receive();
-
-                // Step 6: create transactions.
-                // The transaction timeout is specified as 10 seconds.
-                // If the transaction is not committed within 10 seconds, the 
transaction is automatically aborted.
-                Transaction txn = null;
-                try {
-                    txn = client.newTransaction()
-                            .withTransactionTimeout(10, 
TimeUnit.SECONDS).build().get();
-                    // Step 6: you can process the received message with your 
use case and business logic.
-
-                    // Step 7: the producers produce messages to output topics 
with transactions
-                    outputProducerOne.newMessage(txn).value("Hello Pulsar! 
outputTopicOne count : " + i).send();
-                    outputProducerTwo.newMessage(txn).value("Hello Pulsar! 
outputTopicTwo count : " + i).send();
-
-                    // Step 7: the consumers acknowledge the input message 
with the transactions *individually*.
-                    inputConsumer.acknowledgeAsync(message.getMessageId(), 
txn).get();
-                    // Step 8: commit transactions.
-                    txn.commit().get();
-                } catch (ExecutionException e) {
-                    if (!(e.getCause() instanceof 
PulsarClientException.TransactionConflictException)) {
-                        // If TransactionConflictException is not thrown,
-                        // you need to redeliver or negativeAcknowledge this 
message,
-                        // or else this message will not be received again.
-                        inputConsumer.negativeAcknowledge(message);
-                    }
-
-                    // If a new transaction is created,
-                    // then the old transaction should be aborted.
-                    if (txn != null) {
-                        txn.abort();
-                    }
-                }
-            }
-
-            // Final result: consume messages from output topics and print 
them.
-            for (int i = 0; i < count; i++) {
-                Message<String> message =  outputConsumerOne.receive();
-                System.out.println("Receive transaction message: " + 
message.getValue());
-            }
-
-            for (int i = 0; i < count; i++) {
-                Message<String> message =  outputConsumerTwo.receive();
-                System.out.println("Receive transaction message: " + 
message.getValue());
-            }
+```java
+PulsarClient client = PulsarClient.builder()
+                // Step 3: create a Pulsar client and enable transactions.
+                .enableTransaction(true)
+                .serviceUrl(jct.serviceUrl)
+                .build();
+
+// Step 4: create three producers to produce messages to input and output 
topics.
+ProducerBuilder<String> producerBuilder = client.newProducer(Schema.STRING);
+Producer<String> inputProducer = producerBuilder.topic(inputTopic)
+        .sendTimeout(0, TimeUnit.SECONDS).create();
+Producer<String> outputProducerOne = producerBuilder.topic(outputTopicOne)
+        .sendTimeout(0, TimeUnit.SECONDS).create();
+Producer<String> outputProducerTwo = producerBuilder.topic(outputTopicTwo)
+        .sendTimeout(0, TimeUnit.SECONDS).create();
+// Step 4: create three consumers to consume messages from input and output 
topics.
+Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
+        
.subscriptionName("your-subscription-name").topic(inputTopic).subscribe();
+Consumer<String> outputConsumerOne = client.newConsumer(Schema.STRING)
+        
.subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe();
+Consumer<String> outputConsumerTwo = client.newConsumer(Schema.STRING)
+        
.subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe();
+
+int count = 2;
+// Step 5: produce messages to input topics.
+for (int i = 0; i < count; i++) {
+    inputProducer.send("Hello Pulsar! count : " + i);
+}
+
+// Step 5: consume messages and produce them to output topics with 
transactions.
+for (int i = 0; i < count; i++) {
+
+    // Step 5: the consumer successfully receives messages.
+    Message<String> message = inputConsumer.receive();
+
+    // Step 6: create transactions.
+    // The transaction timeout is specified as 10 seconds.
+    // If the transaction is not committed within 10 seconds, the transaction 
is automatically aborted.
+    Transaction txn = null;
+    try {
+        txn = client.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+        // Step 6: you can process the received message with your use case and 
business logic.
+
+        // Step 7: the producers produce messages to output topics with 
transactions
+        outputProducerOne.newMessage(txn).value("Hello Pulsar! outputTopicOne 
count : " + i).send();
+        outputProducerTwo.newMessage(txn).value("Hello Pulsar! outputTopicTwo 
count : " + i).send();
+
+        // Step 7: the consumers acknowledge the input message with the 
transactions *individually*.
+        inputConsumer.acknowledgeAsync(message.getMessageId(), txn).get();
+        // Step 8: commit transactions.
+        txn.commit().get();
+    } catch (ExecutionException e) {
+        if (!(e.getCause() instanceof 
PulsarClientException.TransactionConflictException)) {
+            // If TransactionConflictException is not thrown,
+            // you need to redeliver or negativeAcknowledge this message,
+            // or else this message will not be received again.
+            inputConsumer.negativeAcknowledge(message);
+        }
+
+        // If a new transaction is created,
+        // then the old transaction should be aborted.
+        if (txn != null) {
+            txn.abort();
         }
     }
-    ```
+}
+
+// Final result: consume messages from output topics and print them.
+for (int i = 0; i < count; i++) {
+    Message<String> message =  outputConsumerOne.receive();
+    System.out.println("Receive transaction message: " + message.getValue());
+}
+
+for (int i = 0; i < count; i++) {
+    Message<String> message =  outputConsumerTwo.receive();
+    System.out.println("Receive transaction message: " + message.getValue());
+}
+```
 
 </TabItem>
 <TabItem value="Go">
 
-  ```go
-  // Step 3: create a Pulsar client and enable transactions.
-       client, err := pulsar.NewClient(pulsar.ClientOptions{
-               URL:               "<serviceUrl>",
-               EnableTransaction: true,
-       })
-       if err != nil {
-               log.Fatalf("create client fail, err = %v", err)
-       }
-       defer client.Close()
-       // Step 4: create three producers to produce messages to input and 
output topics.
-       inputTopic := "inputTopic"
-       outputTopicOne := "outputTopicOne"
-       outputTopicTwo := "outputTopicTwo"
-       subscriptionName := "your-subscription-name"
-       inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
-               Topic:       inputTopic,
-               SendTimeout: 0,
-       })
-       defer inputProducer.Close()
-       outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
-               Topic:       outputTopicOne,
-               SendTimeout: 0,
-       })
-       defer outputProducerOne.Close()
-       outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
-               Topic:       outputTopicTwo,
-               SendTimeout: 0,
-       })
-       defer outputProducerTwo.Close()
-
-       // Step 4: create three consumers to consume messages from input and 
output topics.
-       inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            inputTopic,
-               SubscriptionName: subscriptionName,
-       })
-       defer inputConsumer.Close()
-       outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            outputTopicOne,
-               SubscriptionName: subscriptionName,
-       })
-       defer outputConsumerOne.Close()
-       outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            outputTopicTwo,
-               SubscriptionName: subscriptionName,
-       })
-       defer outputConsumerTwo.Close()
-
-       // Step 5: produce messages to input topics.
-       ctx := context.Background()
-       count := 2
-       for i := 0; i < count; i++ {
-               inputProducer.Send(ctx, &pulsar.ProducerMessage{
-                       Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", 
i)),
-               })
-       }
-       // Step 5: consume messages and produce them to output topics with 
transactions.
-       for i := 0; i < count; i++ {
-               // Step 5: the consumer successfully receives messages.
-               message, err := inputConsumer.Receive(ctx)
-               if err != nil {
-                       log.Printf("receive message from %s fail, err = %v", 
inputTopic, err)
-                       continue
-               }
-               // Step 6: create transactions.
-               // The transaction timeout is specified as 10 seconds.
-               // If the transaction is not committed within 10 seconds, the 
transaction is automatically aborted.
-               txn, err := client.NewTransaction(10 * time.Second)
-               if err != nil {
-                       log.Printf("create txn fail, err = %v", err)
-                       continue
-               }
-               // Step 6: you can process the received message with your use 
case and business logic.
-               // processMessage(message)
-               // Step 7: the producers produce messages to output topics with 
transactions
-               _, err = outputProducerOne.Send(context.Background(), 
&pulsar.ProducerMessage{
-                       Transaction: txn,
-                       Payload:     []byte(fmt.Sprintf("Hello Pulsar! 
outputTopicOne count : %d", i)),
-               })
-               if err != nil {
-                       log.Printf("send to producerOne fail %v", err)
-                       txn.Abort(ctx)
-               }
-               _, err = outputProducerTwo.Send(context.Background(), 
&pulsar.ProducerMessage{
-                       Transaction: txn,
-                       Payload:     []byte(fmt.Sprintf("Hello Pulsar! 
outputTopicTwo count : %d", i)),
-               })
-               if err != nil {
-                       log.Printf("send to producerTwo fail %v", err)
-                       txn.Abort(ctx)
-               }
-               // Step 7: the consumers acknowledge the input message with the 
transactions *individually*.
-               err = inputConsumer.AckWithTxn(message, txn)
-               if err != nil {
-                       log.Printf("ack message fail %v", err)
-                       txn.Abort(ctx)
-               }
-               // Step 8: commit transactions.
-               err = txn.Commit(ctx)
-               if err != nil {
-                       log.Printf("commit txn fail %v", err)
-               }
-       }
-
-       // Final result: consume messages from output topics and print them.
-       for i := 0; i < count; i++ {
-               message, _ := outputConsumerOne.Receive(ctx)
-               log.Printf("Receive transaction message: %s", 
string(message.Payload()))
-       }
-       for i := 0; i < count; i++ {
-               message, _ := outputConsumerTwo.Receive(ctx)
-               log.Printf("Receive transaction message: %s", 
string(message.Payload()))
-       }
-  ```
+```go
+// Step 3: create a Pulsar client and enable transactions.
+client, err := pulsar.NewClient(pulsar.ClientOptions{
+  URL:               "<serviceUrl>",
+  EnableTransaction: true,
+})
+if err != nil {
+  log.Fatalf("create client fail, err = %v", err)
+}
+defer client.Close()
+// Step 4: create three producers to produce messages to input and output 
topics.
+inputTopic := "inputTopic"
+outputTopicOne := "outputTopicOne"
+outputTopicTwo := "outputTopicTwo"
+subscriptionName := "your-subscription-name"
+inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
+  Topic:       inputTopic,
+  SendTimeout: 0,
+})
+defer inputProducer.Close()
+outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
+  Topic:       outputTopicOne,
+  SendTimeout: 0,
+})
+defer outputProducerOne.Close()
+outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
+  Topic:       outputTopicTwo,
+  SendTimeout: 0,
+})
+defer outputProducerTwo.Close()
+
+// Step 4: create three consumers to consume messages from input and output 
topics.
+inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
+  Topic:            inputTopic,
+  SubscriptionName: subscriptionName,
+})
+defer inputConsumer.Close()
+outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
+  Topic:            outputTopicOne,
+  SubscriptionName: subscriptionName,
+})
+defer outputConsumerOne.Close()
+outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
+  Topic:            outputTopicTwo,
+  SubscriptionName: subscriptionName,
+})
+defer outputConsumerTwo.Close()
+
+// Step 5: produce messages to input topics.
+ctx := context.Background()
+count := 2
+for i := 0; i < count; i++ {
+  inputProducer.Send(ctx, &pulsar.ProducerMessage{
+    Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", i)),
+  })
+}
+// Step 5: consume messages and produce them to output topics with 
transactions.
+for i := 0; i < count; i++ {
+  // Step 5: the consumer successfully receives messages.
+  message, err := inputConsumer.Receive(ctx)
+  if err != nil {
+    log.Printf("receive message from %s fail, err = %v", inputTopic, err)
+    continue
+  }
+  // Step 6: create transactions.
+  // The transaction timeout is specified as 10 seconds.
+  // If the transaction is not committed within 10 seconds, the transaction is 
automatically aborted.
+  txn, err := client.NewTransaction(10 * time.Second)
+  if err != nil {
+    log.Printf("create txn fail, err = %v", err)
+    continue
+  }
+  // Step 6: you can process the received message with your use case and 
business logic.
+  // processMessage(message)
+  // Step 7: the producers produce messages to output topics with transactions
+  _, err = outputProducerOne.Send(context.Background(), 
&pulsar.ProducerMessage{
+    Transaction: txn,
+    Payload:     []byte(fmt.Sprintf("Hello Pulsar! outputTopicOne count : %d", 
i)),
+  })
+  if err != nil {
+    log.Printf("send to producerOne fail %v", err)
+    txn.Abort(ctx)
+  }
+  _, err = outputProducerTwo.Send(context.Background(), 
&pulsar.ProducerMessage{
+    Transaction: txn,
+    Payload:     []byte(fmt.Sprintf("Hello Pulsar! outputTopicTwo count : %d", 
i)),
+  })
+  if err != nil {
+    log.Printf("send to producerTwo fail %v", err)
+    txn.Abort(ctx)
+  }
+  // Step 7: the consumers acknowledge the input message with the transactions 
*individually*.
+  err = inputConsumer.AckWithTxn(message, txn)
+  if err != nil {
+    log.Printf("ack message fail %v", err)
+    txn.Abort(ctx)
+  }
+  // Step 8: commit transactions.
+  err = txn.Commit(ctx)
+  if err != nil {
+    log.Printf("commit txn fail %v", err)
+  }
+}
+
+// Final result: consume messages from output topics and print them.
+for i := 0; i < count; i++ {
+  message, _ := outputConsumerOne.Receive(ctx)
+  log.Printf("Receive transaction message: %s", string(message.Payload()))
+}
+for i := 0; i < count; i++ {
+  message, _ := outputConsumerTwo.Receive(ctx)
+  log.Printf("Receive transaction message: %s", string(message.Payload()))
+}
+```
 
 </TabItem>
 </Tabs>
diff --git a/versioned_docs/version-3.1.x/txn-use.md 
b/versioned_docs/version-3.1.x/txn-use.md
index 48cc5a840aaa..d48b824a5606 100644
--- a/versioned_docs/version-3.1.x/txn-use.md
+++ b/versioned_docs/version-3.1.x/txn-use.md
@@ -4,6 +4,11 @@ title: Get started
 sidebar_label: "Get started"
 ---
 
+````mdx-code-block
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+````
+
 Pulsar transaction is primarily a server-side and protocol-level feature. This 
tutorial guides you through every step of how to use the [Pulsar transaction 
API](/api/admin/) to send and receive messages in a Java client.
 
 :::note
@@ -73,8 +78,6 @@ Currently, [Pulsar transaction API](/api/admin/) is available 
in **Pulsar 2.8.0
 
     :::
 
-    **Input**
-
 ````mdx-code-block
 <Tabs groupId="api-choice"
   defaultValue="Java"
@@ -82,205 +85,203 @@ Currently, [Pulsar transaction API](/api/admin/) is 
available in **Pulsar 2.8.0
 
 <TabItem value="Java">
 
-    ```java
-    PulsarClient client = PulsarClient.builder()
-                    // Step 3: create a Pulsar client and enable transactions.
-                    .enableTransaction(true)
-                    .serviceUrl(jct.serviceUrl)
-                    .build();
-
-            // Step 4: create three producers to produce messages to input and 
output topics.
-            ProducerBuilder<String> producerBuilder = 
client.newProducer(Schema.STRING);
-            Producer<String> inputProducer = producerBuilder.topic(inputTopic)
-                    .sendTimeout(0, TimeUnit.SECONDS).create();
-            Producer<String> outputProducerOne = 
producerBuilder.topic(outputTopicOne)
-                    .sendTimeout(0, TimeUnit.SECONDS).create();
-            Producer<String> outputProducerTwo = 
producerBuilder.topic(outputTopicTwo)
-                    .sendTimeout(0, TimeUnit.SECONDS).create();
-            // Step 4: create three consumers to consume messages from input 
and output topics.
-            Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
-                    
.subscriptionName("your-subscription-name").topic(inputTopic).subscribe();
-            Consumer<String> outputConsumerOne = 
client.newConsumer(Schema.STRING)
-                    
.subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe();
-            Consumer<String> outputConsumerTwo = 
client.newConsumer(Schema.STRING)
-                    
.subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe();
-
-            int count = 2;
-            // Step 5: produce messages to input topics.
-            for (int i = 0; i < count; i++) {
-                inputProducer.send("Hello Pulsar! count : " + i);
-            }
-
-            // Step 5: consume messages and produce them to output topics with 
transactions.
-            for (int i = 0; i < count; i++) {
-
-                // Step 5: the consumer successfully receives messages.
-                Message<String> message = inputConsumer.receive();
-
-                // Step 6: create transactions.
-                // The transaction timeout is specified as 10 seconds.
-                // If the transaction is not committed within 10 seconds, the 
transaction is automatically aborted.
-                Transaction txn = null;
-                try {
-                    txn = client.newTransaction()
-                            .withTransactionTimeout(10, 
TimeUnit.SECONDS).build().get();
-                    // Step 6: you can process the received message with your 
use case and business logic.
-
-                    // Step 7: the producers produce messages to output topics 
with transactions
-                    outputProducerOne.newMessage(txn).value("Hello Pulsar! 
outputTopicOne count : " + i).send();
-                    outputProducerTwo.newMessage(txn).value("Hello Pulsar! 
outputTopicTwo count : " + i).send();
-
-                    // Step 7: the consumers acknowledge the input message 
with the transactions *individually*.
-                    inputConsumer.acknowledgeAsync(message.getMessageId(), 
txn).get();
-                    // Step 8: commit transactions.
-                    txn.commit().get();
-                } catch (ExecutionException e) {
-                    if (!(e.getCause() instanceof 
PulsarClientException.TransactionConflictException)) {
-                        // If TransactionConflictException is not thrown,
-                        // you need to redeliver or negativeAcknowledge this 
message,
-                        // or else this message will not be received again.
-                        inputConsumer.negativeAcknowledge(message);
-                    }
-
-                    // If a new transaction is created,
-                    // then the old transaction should be aborted.
-                    if (txn != null) {
-                        txn.abort();
-                    }
-                }
-            }
-
-            // Final result: consume messages from output topics and print 
them.
-            for (int i = 0; i < count; i++) {
-                Message<String> message =  outputConsumerOne.receive();
-                System.out.println("Receive transaction message: " + 
message.getValue());
-            }
-
-            for (int i = 0; i < count; i++) {
-                Message<String> message =  outputConsumerTwo.receive();
-                System.out.println("Receive transaction message: " + 
message.getValue());
-            }
+```java
+PulsarClient client = PulsarClient.builder()
+                // Step 3: create a Pulsar client and enable transactions.
+                .enableTransaction(true)
+                .serviceUrl(jct.serviceUrl)
+                .build();
+
+// Step 4: create three producers to produce messages to input and output 
topics.
+ProducerBuilder<String> producerBuilder = client.newProducer(Schema.STRING);
+Producer<String> inputProducer = producerBuilder.topic(inputTopic)
+        .sendTimeout(0, TimeUnit.SECONDS).create();
+Producer<String> outputProducerOne = producerBuilder.topic(outputTopicOne)
+        .sendTimeout(0, TimeUnit.SECONDS).create();
+Producer<String> outputProducerTwo = producerBuilder.topic(outputTopicTwo)
+        .sendTimeout(0, TimeUnit.SECONDS).create();
+// Step 4: create three consumers to consume messages from input and output 
topics.
+Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
+        
.subscriptionName("your-subscription-name").topic(inputTopic).subscribe();
+Consumer<String> outputConsumerOne = client.newConsumer(Schema.STRING)
+        
.subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe();
+Consumer<String> outputConsumerTwo = client.newConsumer(Schema.STRING)
+        
.subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe();
+
+int count = 2;
+// Step 5: produce messages to input topics.
+for (int i = 0; i < count; i++) {
+    inputProducer.send("Hello Pulsar! count : " + i);
+}
+
+// Step 5: consume messages and produce them to output topics with 
transactions.
+for (int i = 0; i < count; i++) {
+
+    // Step 5: the consumer successfully receives messages.
+    Message<String> message = inputConsumer.receive();
+
+    // Step 6: create transactions.
+    // The transaction timeout is specified as 10 seconds.
+    // If the transaction is not committed within 10 seconds, the transaction 
is automatically aborted.
+    Transaction txn = null;
+    try {
+        txn = client.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+        // Step 6: you can process the received message with your use case and 
business logic.
+
+        // Step 7: the producers produce messages to output topics with 
transactions
+        outputProducerOne.newMessage(txn).value("Hello Pulsar! outputTopicOne 
count : " + i).send();
+        outputProducerTwo.newMessage(txn).value("Hello Pulsar! outputTopicTwo 
count : " + i).send();
+
+        // Step 7: the consumers acknowledge the input message with the 
transactions *individually*.
+        inputConsumer.acknowledgeAsync(message.getMessageId(), txn).get();
+        // Step 8: commit transactions.
+        txn.commit().get();
+    } catch (ExecutionException e) {
+        if (!(e.getCause() instanceof 
PulsarClientException.TransactionConflictException)) {
+            // If TransactionConflictException is not thrown,
+            // you need to redeliver or negativeAcknowledge this message,
+            // or else this message will not be received again.
+            inputConsumer.negativeAcknowledge(message);
+        }
+
+        // If a new transaction is created,
+        // then the old transaction should be aborted.
+        if (txn != null) {
+            txn.abort();
         }
     }
-    ```
+}
+
+// Final result: consume messages from output topics and print them.
+for (int i = 0; i < count; i++) {
+    Message<String> message =  outputConsumerOne.receive();
+    System.out.println("Receive transaction message: " + message.getValue());
+}
+
+for (int i = 0; i < count; i++) {
+    Message<String> message =  outputConsumerTwo.receive();
+    System.out.println("Receive transaction message: " + message.getValue());
+}
+```
 
 </TabItem>
 <TabItem value="Go">
 
-  ```go
-       // Step 3: create a Pulsar client and enable transactions.
-       client, err := pulsar.NewClient(pulsar.ClientOptions{
-               URL:               "<serviceUrl>",
-               EnableTransaction: true,
-       })
-       if err != nil {
-               log.Fatalf("create client fail, err = %v", err)
-       }
-       defer client.Close()
-       // Step 4: create three producers to produce messages to input and 
output topics.
-       inputTopic := "inputTopic"
-       outputTopicOne := "outputTopicOne"
-       outputTopicTwo := "outputTopicTwo"
-       subscriptionName := "your-subscription-name"
-       inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
-               Topic:       inputTopic,
-               SendTimeout: 0,
-       })
-       defer inputProducer.Close()
-       outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
-               Topic:       outputTopicOne,
-               SendTimeout: 0,
-       })
-       defer outputProducerOne.Close()
-       outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
-               Topic:       outputTopicTwo,
-               SendTimeout: 0,
-       })
-       defer outputProducerTwo.Close()
-
-       // Step 4: create three consumers to consume messages from input and 
output topics.
-       inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            inputTopic,
-               SubscriptionName: subscriptionName,
-       })
-       defer inputConsumer.Close()
-       outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            outputTopicOne,
-               SubscriptionName: subscriptionName,
-       })
-       defer outputConsumerOne.Close()
-       outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            outputTopicTwo,
-               SubscriptionName: subscriptionName,
-       })
-       defer outputConsumerTwo.Close()
-
-       // Step 5: produce messages to input topics.
-       ctx := context.Background()
-       count := 2
-       for i := 0; i < count; i++ {
-               inputProducer.Send(ctx, &pulsar.ProducerMessage{
-                       Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", 
i)),
-               })
-       }
-       // Step 5: consume messages and produce them to output topics with 
transactions.
-       for i := 0; i < count; i++ {
-               // Step 5: the consumer successfully receives messages.
-               message, err := inputConsumer.Receive(ctx)
-               if err != nil {
-                       log.Printf("receive message from %s fail, err = %v", 
inputTopic, err)
-                       continue
-               }
-               // Step 6: create transactions.
-               // The transaction timeout is specified as 10 seconds.
-               // If the transaction is not committed within 10 seconds, the 
transaction is automatically aborted.
-               txn, err := client.NewTransaction(10 * time.Second)
-               if err != nil {
-                       log.Printf("create txn fail, err = %v", err)
-                       continue
-               }
-               // Step 6: you can process the received message with your use 
case and business logic.
-               // processMessage(message)
-               // Step 7: the producers produce messages to output topics with 
transactions
-               _, err = outputProducerOne.Send(context.Background(), 
&pulsar.ProducerMessage{
-                       Transaction: txn,
-                       Payload:     []byte(fmt.Sprintf("Hello Pulsar! 
outputTopicOne count : %d", i)),
-               })
-               if err != nil {
-                       log.Printf("send to producerOne fail %v", err)
-                       txn.Abort(ctx)
-               }
-               _, err = outputProducerTwo.Send(context.Background(), 
&pulsar.ProducerMessage{
-                       Transaction: txn,
-                       Payload:     []byte(fmt.Sprintf("Hello Pulsar! 
outputTopicTwo count : %d", i)),
-               })
-               if err != nil {
-                       log.Printf("send to producerTwo fail %v", err)
-                       txn.Abort(ctx)
-               }
-               // Step 7: the consumers acknowledge the input message with the 
transactions *individually*.
-               err = inputConsumer.AckWithTxn(message, txn)
-               if err != nil {
-                       log.Printf("ack message fail %v", err)
-                       txn.Abort(ctx)
-               }
-               // Step 8: commit transactions.
-               err = txn.Commit(ctx)
-               if err != nil {
-                       log.Printf("commit txn fail %v", err)
-               }
-       }
-
-       // Final result: consume messages from output topics and print them.
-       for i := 0; i < count; i++ {
-               message, _ := outputConsumerOne.Receive(ctx)
-               log.Printf("Receive transaction message: %s", 
string(message.Payload()))
-       }
-       for i := 0; i < count; i++ {
-               message, _ := outputConsumerTwo.Receive(ctx)
-               log.Printf("Receive transaction message: %s", 
string(message.Payload()))
-       }
-  ```
+```go
+// Step 3: create a Pulsar client and enable transactions.
+client, err := pulsar.NewClient(pulsar.ClientOptions{
+  URL:               "<serviceUrl>",
+  EnableTransaction: true,
+})
+if err != nil {
+  log.Fatalf("create client fail, err = %v", err)
+}
+defer client.Close()
+// Step 4: create three producers to produce messages to input and output 
topics.
+inputTopic := "inputTopic"
+outputTopicOne := "outputTopicOne"
+outputTopicTwo := "outputTopicTwo"
+subscriptionName := "your-subscription-name"
+inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
+  Topic:       inputTopic,
+  SendTimeout: 0,
+})
+defer inputProducer.Close()
+outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
+  Topic:       outputTopicOne,
+  SendTimeout: 0,
+})
+defer outputProducerOne.Close()
+outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
+  Topic:       outputTopicTwo,
+  SendTimeout: 0,
+})
+defer outputProducerTwo.Close()
+
+// Step 4: create three consumers to consume messages from input and output 
topics.
+inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
+  Topic:            inputTopic,
+  SubscriptionName: subscriptionName,
+})
+defer inputConsumer.Close()
+outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
+  Topic:            outputTopicOne,
+  SubscriptionName: subscriptionName,
+})
+defer outputConsumerOne.Close()
+outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
+  Topic:            outputTopicTwo,
+  SubscriptionName: subscriptionName,
+})
+defer outputConsumerTwo.Close()
+
+// Step 5: produce messages to input topics.
+ctx := context.Background()
+count := 2
+for i := 0; i < count; i++ {
+  inputProducer.Send(ctx, &pulsar.ProducerMessage{
+    Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", i)),
+  })
+}
+// Step 5: consume messages and produce them to output topics with 
transactions.
+for i := 0; i < count; i++ {
+  // Step 5: the consumer successfully receives messages.
+  message, err := inputConsumer.Receive(ctx)
+  if err != nil {
+    log.Printf("receive message from %s fail, err = %v", inputTopic, err)
+    continue
+  }
+  // Step 6: create transactions.
+  // The transaction timeout is specified as 10 seconds.
+  // If the transaction is not committed within 10 seconds, the transaction is 
automatically aborted.
+  txn, err := client.NewTransaction(10 * time.Second)
+  if err != nil {
+    log.Printf("create txn fail, err = %v", err)
+    continue
+  }
+  // Step 6: you can process the received message with your use case and 
business logic.
+  // processMessage(message)
+  // Step 7: the producers produce messages to output topics with transactions
+  _, err = outputProducerOne.Send(context.Background(), 
&pulsar.ProducerMessage{
+    Transaction: txn,
+    Payload:     []byte(fmt.Sprintf("Hello Pulsar! outputTopicOne count : %d", 
i)),
+  })
+  if err != nil {
+    log.Printf("send to producerOne fail %v", err)
+    txn.Abort(ctx)
+  }
+  _, err = outputProducerTwo.Send(context.Background(), 
&pulsar.ProducerMessage{
+    Transaction: txn,
+    Payload:     []byte(fmt.Sprintf("Hello Pulsar! outputTopicTwo count : %d", 
i)),
+  })
+  if err != nil {
+    log.Printf("send to producerTwo fail %v", err)
+    txn.Abort(ctx)
+  }
+  // Step 7: the consumers acknowledge the input message with the transactions 
*individually*.
+  err = inputConsumer.AckWithTxn(message, txn)
+  if err != nil {
+    log.Printf("ack message fail %v", err)
+    txn.Abort(ctx)
+  }
+  // Step 8: commit transactions.
+  err = txn.Commit(ctx)
+  if err != nil {
+    log.Printf("commit txn fail %v", err)
+  }
+}
+
+// Final result: consume messages from output topics and print them.
+for i := 0; i < count; i++ {
+  message, _ := outputConsumerOne.Receive(ctx)
+  log.Printf("Receive transaction message: %s", string(message.Payload()))
+}
+for i := 0; i < count; i++ {
+  message, _ := outputConsumerTwo.Receive(ctx)
+  log.Printf("Receive transaction message: %s", string(message.Payload()))
+}
+```
 
 </TabItem>
 </Tabs>
diff --git a/versioned_docs/version-3.2.x/txn-use.md 
b/versioned_docs/version-3.2.x/txn-use.md
index 193a03de4dbb..556d868b6c4c 100644
--- a/versioned_docs/version-3.2.x/txn-use.md
+++ b/versioned_docs/version-3.2.x/txn-use.md
@@ -5,6 +5,11 @@ sidebar_label: "Get started"
 description: Get started to use Pulsar transaction API.
 ---
 
+````mdx-code-block
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+````
+
 Pulsar transaction is primarily a server-side and protocol-level feature. This 
tutorial guides you through every step of how to use the [Pulsar transaction 
API](/api/admin/) to send and receive messages in a Java client.
 
 :::note
@@ -78,8 +83,6 @@ To use Pulsar transaction API, complete the following steps.
 
     :::
 
-    **Input**
-
 ````mdx-code-block
 <Tabs groupId="api-choice"
   defaultValue="Java"
@@ -87,205 +90,203 @@ To use Pulsar transaction API, complete the following 
steps.
 
 <TabItem value="Java">
 
-    ```java
-    PulsarClient client = PulsarClient.builder()
-                    // Step 3: create a Pulsar client and enable transactions.
-                    .enableTransaction(true)
-                    .serviceUrl(jct.serviceUrl)
-                    .build();
-
-            // Step 4: create three producers to produce messages to input and 
output topics.
-            ProducerBuilder<String> producerBuilder = 
client.newProducer(Schema.STRING);
-            Producer<String> inputProducer = producerBuilder.topic(inputTopic)
-                    .sendTimeout(0, TimeUnit.SECONDS).create();
-            Producer<String> outputProducerOne = 
producerBuilder.topic(outputTopicOne)
-                    .sendTimeout(0, TimeUnit.SECONDS).create();
-            Producer<String> outputProducerTwo = 
producerBuilder.topic(outputTopicTwo)
-                    .sendTimeout(0, TimeUnit.SECONDS).create();
-            // Step 4: create three consumers to consume messages from input 
and output topics.
-            Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
-                    
.subscriptionName("your-subscription-name").topic(inputTopic).subscribe();
-            Consumer<String> outputConsumerOne = 
client.newConsumer(Schema.STRING)
-                    
.subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe();
-            Consumer<String> outputConsumerTwo = 
client.newConsumer(Schema.STRING)
-                    
.subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe();
-
-            int count = 2;
-            // Step 5: produce messages to input topics.
-            for (int i = 0; i < count; i++) {
-                inputProducer.send("Hello Pulsar! count : " + i);
-            }
-
-            // Step 5: consume messages and produce them to output topics with 
transactions.
-            for (int i = 0; i < count; i++) {
-
-                // Step 5: the consumer successfully receives messages.
-                Message<String> message = inputConsumer.receive();
-
-                // Step 6: create transactions.
-                // The transaction timeout is specified as 10 seconds.
-                // If the transaction is not committed within 10 seconds, the 
transaction is automatically aborted.
-                Transaction txn = null;
-                try {
-                    txn = client.newTransaction()
-                            .withTransactionTimeout(10, 
TimeUnit.SECONDS).build().get();
-                    // Step 6: you can process the received message with your 
use case and business logic.
-
-                    // Step 7: the producers produce messages to output topics 
with transactions
-                    outputProducerOne.newMessage(txn).value("Hello Pulsar! 
outputTopicOne count : " + i).send();
-                    outputProducerTwo.newMessage(txn).value("Hello Pulsar! 
outputTopicTwo count : " + i).send();
-
-                    // Step 7: the consumers acknowledge the input message 
with the transactions *individually*.
-                    inputConsumer.acknowledgeAsync(message.getMessageId(), 
txn).get();
-                    // Step 8: commit transactions.
-                    txn.commit().get();
-                } catch (ExecutionException e) {
-                    if (!(e.getCause() instanceof 
PulsarClientException.TransactionConflictException)) {
-                        // If TransactionConflictException is not thrown,
-                        // you need to redeliver or negativeAcknowledge this 
message,
-                        // or else this message will not be received again.
-                        inputConsumer.negativeAcknowledge(message);
-                    }
-
-                    // If a new transaction is created,
-                    // then the old transaction should be aborted.
-                    if (txn != null) {
-                        txn.abort();
-                    }
-                }
-            }
-
-            // Final result: consume messages from output topics and print 
them.
-            for (int i = 0; i < count; i++) {
-                Message<String> message =  outputConsumerOne.receive();
-                System.out.println("Receive transaction message: " + 
message.getValue());
-            }
-
-            for (int i = 0; i < count; i++) {
-                Message<String> message =  outputConsumerTwo.receive();
-                System.out.println("Receive transaction message: " + 
message.getValue());
-            }
+```java
+PulsarClient client = PulsarClient.builder()
+                // Step 3: create a Pulsar client and enable transactions.
+                .enableTransaction(true)
+                .serviceUrl(jct.serviceUrl)
+                .build();
+
+// Step 4: create three producers to produce messages to input and output 
topics.
+ProducerBuilder<String> producerBuilder = client.newProducer(Schema.STRING);
+Producer<String> inputProducer = producerBuilder.topic(inputTopic)
+        .sendTimeout(0, TimeUnit.SECONDS).create();
+Producer<String> outputProducerOne = producerBuilder.topic(outputTopicOne)
+        .sendTimeout(0, TimeUnit.SECONDS).create();
+Producer<String> outputProducerTwo = producerBuilder.topic(outputTopicTwo)
+        .sendTimeout(0, TimeUnit.SECONDS).create();
+// Step 4: create three consumers to consume messages from input and output 
topics.
+Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
+        
.subscriptionName("your-subscription-name").topic(inputTopic).subscribe();
+Consumer<String> outputConsumerOne = client.newConsumer(Schema.STRING)
+        
.subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe();
+Consumer<String> outputConsumerTwo = client.newConsumer(Schema.STRING)
+        
.subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe();
+
+int count = 2;
+// Step 5: produce messages to input topics.
+for (int i = 0; i < count; i++) {
+    inputProducer.send("Hello Pulsar! count : " + i);
+}
+
+// Step 5: consume messages and produce them to output topics with 
transactions.
+for (int i = 0; i < count; i++) {
+
+    // Step 5: the consumer successfully receives messages.
+    Message<String> message = inputConsumer.receive();
+
+    // Step 6: create transactions.
+    // The transaction timeout is specified as 10 seconds.
+    // If the transaction is not committed within 10 seconds, the transaction 
is automatically aborted.
+    Transaction txn = null;
+    try {
+        txn = client.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+        // Step 6: you can process the received message with your use case and 
business logic.
+
+        // Step 7: the producers produce messages to output topics with 
transactions
+        outputProducerOne.newMessage(txn).value("Hello Pulsar! outputTopicOne 
count : " + i).send();
+        outputProducerTwo.newMessage(txn).value("Hello Pulsar! outputTopicTwo 
count : " + i).send();
+
+        // Step 7: the consumers acknowledge the input message with the 
transactions *individually*.
+        inputConsumer.acknowledgeAsync(message.getMessageId(), txn).get();
+        // Step 8: commit transactions.
+        txn.commit().get();
+    } catch (ExecutionException e) {
+        if (!(e.getCause() instanceof 
PulsarClientException.TransactionConflictException)) {
+            // If TransactionConflictException is not thrown,
+            // you need to redeliver or negativeAcknowledge this message,
+            // or else this message will not be received again.
+            inputConsumer.negativeAcknowledge(message);
+        }
+
+        // If a new transaction is created,
+        // then the old transaction should be aborted.
+        if (txn != null) {
+            txn.abort();
         }
     }
-    ```
+}
+
+// Final result: consume messages from output topics and print them.
+for (int i = 0; i < count; i++) {
+    Message<String> message =  outputConsumerOne.receive();
+    System.out.println("Receive transaction message: " + message.getValue());
+}
+
+for (int i = 0; i < count; i++) {
+    Message<String> message =  outputConsumerTwo.receive();
+    System.out.println("Receive transaction message: " + message.getValue());
+}
+```
 
 </TabItem>
 <TabItem value="Go">
 
-  ```go
-       // Step 3: create a Pulsar client and enable transactions.
-       client, err := pulsar.NewClient(pulsar.ClientOptions{
-               URL:               "<serviceUrl>",
-               EnableTransaction: true,
-       })
-       if err != nil {
-               log.Fatalf("create client fail, err = %v", err)
-       }
-       defer client.Close()
-       // Step 4: create three producers to produce messages to input and 
output topics.
-       inputTopic := "inputTopic"
-       outputTopicOne := "outputTopicOne"
-       outputTopicTwo := "outputTopicTwo"
-       subscriptionName := "your-subscription-name"
-       inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
-               Topic:       inputTopic,
-               SendTimeout: 0,
-       })
-       defer inputProducer.Close()
-       outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
-               Topic:       outputTopicOne,
-               SendTimeout: 0,
-       })
-       defer outputProducerOne.Close()
-       outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
-               Topic:       outputTopicTwo,
-               SendTimeout: 0,
-       })
-       defer outputProducerTwo.Close()
-
-       // Step 4: create three consumers to consume messages from input and 
output topics.
-       inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            inputTopic,
-               SubscriptionName: subscriptionName,
-       })
-       defer inputConsumer.Close()
-       outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            outputTopicOne,
-               SubscriptionName: subscriptionName,
-       })
-       defer outputConsumerOne.Close()
-       outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            outputTopicTwo,
-               SubscriptionName: subscriptionName,
-       })
-       defer outputConsumerTwo.Close()
-
-       // Step 5: produce messages to input topics.
-       ctx := context.Background()
-       count := 2
-       for i := 0; i < count; i++ {
-               inputProducer.Send(ctx, &pulsar.ProducerMessage{
-                       Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", 
i)),
-               })
-       }
-       // Step 5: consume messages and produce them to output topics with 
transactions.
-       for i := 0; i < count; i++ {
-               // Step 5: the consumer successfully receives messages.
-               message, err := inputConsumer.Receive(ctx)
-               if err != nil {
-                       log.Printf("receive message from %s fail, err = %v", 
inputTopic, err)
-                       continue
-               }
-               // Step 6: create transactions.
-               // The transaction timeout is specified as 10 seconds.
-               // If the transaction is not committed within 10 seconds, the 
transaction is automatically aborted.
-               txn, err := client.NewTransaction(10 * time.Second)
-               if err != nil {
-                       log.Printf("create txn fail, err = %v", err)
-                       continue
-               }
-               // Step 6: you can process the received message with your use 
case and business logic.
-               // processMessage(message)
-               // Step 7: the producers produce messages to output topics with 
transactions
-               _, err = outputProducerOne.Send(context.Background(), 
&pulsar.ProducerMessage{
-                       Transaction: txn,
-                       Payload:     []byte(fmt.Sprintf("Hello Pulsar! 
outputTopicOne count : %d", i)),
-               })
-               if err != nil {
-                       log.Printf("send to producerOne fail %v", err)
-                       txn.Abort(ctx)
-               }
-               _, err = outputProducerTwo.Send(context.Background(), 
&pulsar.ProducerMessage{
-                       Transaction: txn,
-                       Payload:     []byte(fmt.Sprintf("Hello Pulsar! 
outputTopicTwo count : %d", i)),
-               })
-               if err != nil {
-                       log.Printf("send to producerTwo fail %v", err)
-                       txn.Abort(ctx)
-               }
-               // Step 7: the consumers acknowledge the input message with the 
transactions *individually*.
-               err = inputConsumer.AckWithTxn(message, txn)
-               if err != nil {
-                       log.Printf("ack message fail %v", err)
-                       txn.Abort(ctx)
-               }
-               // Step 8: commit transactions.
-               err = txn.Commit(ctx)
-               if err != nil {
-                       log.Printf("commit txn fail %v", err)
-               }
-       }
-
-       // Final result: consume messages from output topics and print them.
-       for i := 0; i < count; i++ {
-               message, _ := outputConsumerOne.Receive(ctx)
-               log.Printf("Receive transaction message: %s", 
string(message.Payload()))
-       }
-       for i := 0; i < count; i++ {
-               message, _ := outputConsumerTwo.Receive(ctx)
-               log.Printf("Receive transaction message: %s", 
string(message.Payload()))
-       }
-  ```
+```go
+// Step 3: create a Pulsar client and enable transactions.
+client, err := pulsar.NewClient(pulsar.ClientOptions{
+  URL:               "<serviceUrl>",
+  EnableTransaction: true,
+})
+if err != nil {
+  log.Fatalf("create client fail, err = %v", err)
+}
+defer client.Close()
+// Step 4: create three producers to produce messages to input and output 
topics.
+inputTopic := "inputTopic"
+outputTopicOne := "outputTopicOne"
+outputTopicTwo := "outputTopicTwo"
+subscriptionName := "your-subscription-name"
+inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
+  Topic:       inputTopic,
+  SendTimeout: 0,
+})
+defer inputProducer.Close()
+outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
+  Topic:       outputTopicOne,
+  SendTimeout: 0,
+})
+defer outputProducerOne.Close()
+outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
+  Topic:       outputTopicTwo,
+  SendTimeout: 0,
+})
+defer outputProducerTwo.Close()
+
+// Step 4: create three consumers to consume messages from input and output 
topics.
+inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
+  Topic:            inputTopic,
+  SubscriptionName: subscriptionName,
+})
+defer inputConsumer.Close()
+outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
+  Topic:            outputTopicOne,
+  SubscriptionName: subscriptionName,
+})
+defer outputConsumerOne.Close()
+outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
+  Topic:            outputTopicTwo,
+  SubscriptionName: subscriptionName,
+})
+defer outputConsumerTwo.Close()
+
+// Step 5: produce messages to input topics.
+ctx := context.Background()
+count := 2
+for i := 0; i < count; i++ {
+  inputProducer.Send(ctx, &pulsar.ProducerMessage{
+    Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", i)),
+  })
+}
+// Step 5: consume messages and produce them to output topics with 
transactions.
+for i := 0; i < count; i++ {
+  // Step 5: the consumer successfully receives messages.
+  message, err := inputConsumer.Receive(ctx)
+  if err != nil {
+    log.Printf("receive message from %s fail, err = %v", inputTopic, err)
+    continue
+  }
+  // Step 6: create transactions.
+  // The transaction timeout is specified as 10 seconds.
+  // If the transaction is not committed within 10 seconds, the transaction is 
automatically aborted.
+  txn, err := client.NewTransaction(10 * time.Second)
+  if err != nil {
+    log.Printf("create txn fail, err = %v", err)
+    continue
+  }
+  // Step 6: you can process the received message with your use case and 
business logic.
+  // processMessage(message)
+  // Step 7: the producers produce messages to output topics with transactions
+  _, err = outputProducerOne.Send(context.Background(), 
&pulsar.ProducerMessage{
+    Transaction: txn,
+    Payload:     []byte(fmt.Sprintf("Hello Pulsar! outputTopicOne count : %d", 
i)),
+  })
+  if err != nil {
+    log.Printf("send to producerOne fail %v", err)
+    txn.Abort(ctx)
+  }
+  _, err = outputProducerTwo.Send(context.Background(), 
&pulsar.ProducerMessage{
+    Transaction: txn,
+    Payload:     []byte(fmt.Sprintf("Hello Pulsar! outputTopicTwo count : %d", 
i)),
+  })
+  if err != nil {
+    log.Printf("send to producerTwo fail %v", err)
+    txn.Abort(ctx)
+  }
+  // Step 7: the consumers acknowledge the input message with the transactions 
*individually*.
+  err = inputConsumer.AckWithTxn(message, txn)
+  if err != nil {
+    log.Printf("ack message fail %v", err)
+    txn.Abort(ctx)
+  }
+  // Step 8: commit transactions.
+  err = txn.Commit(ctx)
+  if err != nil {
+    log.Printf("commit txn fail %v", err)
+  }
+}
+
+// Final result: consume messages from output topics and print them.
+for i := 0; i < count; i++ {
+  message, _ := outputConsumerOne.Receive(ctx)
+  log.Printf("Receive transaction message: %s", string(message.Payload()))
+}
+for i := 0; i < count; i++ {
+  message, _ := outputConsumerTwo.Receive(ctx)
+  log.Printf("Receive transaction message: %s", string(message.Payload()))
+}
+```
 
 </TabItem>
 </Tabs>
diff --git a/versioned_docs/version-3.3.x/txn-use.md 
b/versioned_docs/version-3.3.x/txn-use.md
index 193a03de4dbb..8aed80e01190 100644
--- a/versioned_docs/version-3.3.x/txn-use.md
+++ b/versioned_docs/version-3.3.x/txn-use.md
@@ -5,6 +5,11 @@ sidebar_label: "Get started"
 description: Get started to use Pulsar transaction API.
 ---
 
+````mdx-code-block
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+````
+
 Pulsar transaction is primarily a server-side and protocol-level feature. This 
tutorial guides you through every step of how to use the [Pulsar transaction 
API](/api/admin/) to send and receive messages in a Java client.
 
 :::note
@@ -78,7 +83,6 @@ To use Pulsar transaction API, complete the following steps.
 
     :::
 
-    **Input**
 
 ````mdx-code-block
 <Tabs groupId="api-choice"
@@ -87,205 +91,203 @@ To use Pulsar transaction API, complete the following 
steps.
 
 <TabItem value="Java">
 
-    ```java
-    PulsarClient client = PulsarClient.builder()
-                    // Step 3: create a Pulsar client and enable transactions.
-                    .enableTransaction(true)
-                    .serviceUrl(jct.serviceUrl)
-                    .build();
-
-            // Step 4: create three producers to produce messages to input and 
output topics.
-            ProducerBuilder<String> producerBuilder = 
client.newProducer(Schema.STRING);
-            Producer<String> inputProducer = producerBuilder.topic(inputTopic)
-                    .sendTimeout(0, TimeUnit.SECONDS).create();
-            Producer<String> outputProducerOne = 
producerBuilder.topic(outputTopicOne)
-                    .sendTimeout(0, TimeUnit.SECONDS).create();
-            Producer<String> outputProducerTwo = 
producerBuilder.topic(outputTopicTwo)
-                    .sendTimeout(0, TimeUnit.SECONDS).create();
-            // Step 4: create three consumers to consume messages from input 
and output topics.
-            Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
-                    
.subscriptionName("your-subscription-name").topic(inputTopic).subscribe();
-            Consumer<String> outputConsumerOne = 
client.newConsumer(Schema.STRING)
-                    
.subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe();
-            Consumer<String> outputConsumerTwo = 
client.newConsumer(Schema.STRING)
-                    
.subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe();
-
-            int count = 2;
-            // Step 5: produce messages to input topics.
-            for (int i = 0; i < count; i++) {
-                inputProducer.send("Hello Pulsar! count : " + i);
-            }
-
-            // Step 5: consume messages and produce them to output topics with 
transactions.
-            for (int i = 0; i < count; i++) {
-
-                // Step 5: the consumer successfully receives messages.
-                Message<String> message = inputConsumer.receive();
-
-                // Step 6: create transactions.
-                // The transaction timeout is specified as 10 seconds.
-                // If the transaction is not committed within 10 seconds, the 
transaction is automatically aborted.
-                Transaction txn = null;
-                try {
-                    txn = client.newTransaction()
-                            .withTransactionTimeout(10, 
TimeUnit.SECONDS).build().get();
-                    // Step 6: you can process the received message with your 
use case and business logic.
-
-                    // Step 7: the producers produce messages to output topics 
with transactions
-                    outputProducerOne.newMessage(txn).value("Hello Pulsar! 
outputTopicOne count : " + i).send();
-                    outputProducerTwo.newMessage(txn).value("Hello Pulsar! 
outputTopicTwo count : " + i).send();
-
-                    // Step 7: the consumers acknowledge the input message 
with the transactions *individually*.
-                    inputConsumer.acknowledgeAsync(message.getMessageId(), 
txn).get();
-                    // Step 8: commit transactions.
-                    txn.commit().get();
-                } catch (ExecutionException e) {
-                    if (!(e.getCause() instanceof 
PulsarClientException.TransactionConflictException)) {
-                        // If TransactionConflictException is not thrown,
-                        // you need to redeliver or negativeAcknowledge this 
message,
-                        // or else this message will not be received again.
-                        inputConsumer.negativeAcknowledge(message);
-                    }
-
-                    // If a new transaction is created,
-                    // then the old transaction should be aborted.
-                    if (txn != null) {
-                        txn.abort();
-                    }
-                }
-            }
-
-            // Final result: consume messages from output topics and print 
them.
-            for (int i = 0; i < count; i++) {
-                Message<String> message =  outputConsumerOne.receive();
-                System.out.println("Receive transaction message: " + 
message.getValue());
-            }
-
-            for (int i = 0; i < count; i++) {
-                Message<String> message =  outputConsumerTwo.receive();
-                System.out.println("Receive transaction message: " + 
message.getValue());
-            }
+```java
+PulsarClient client = PulsarClient.builder()
+                // Step 3: create a Pulsar client and enable transactions.
+                .enableTransaction(true)
+                .serviceUrl(jct.serviceUrl)
+                .build();
+
+// Step 4: create three producers to produce messages to input and output 
topics.
+ProducerBuilder<String> producerBuilder = client.newProducer(Schema.STRING);
+Producer<String> inputProducer = producerBuilder.topic(inputTopic)
+        .sendTimeout(0, TimeUnit.SECONDS).create();
+Producer<String> outputProducerOne = producerBuilder.topic(outputTopicOne)
+        .sendTimeout(0, TimeUnit.SECONDS).create();
+Producer<String> outputProducerTwo = producerBuilder.topic(outputTopicTwo)
+        .sendTimeout(0, TimeUnit.SECONDS).create();
+// Step 4: create three consumers to consume messages from input and output 
topics.
+Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
+        
.subscriptionName("your-subscription-name").topic(inputTopic).subscribe();
+Consumer<String> outputConsumerOne = client.newConsumer(Schema.STRING)
+        
.subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe();
+Consumer<String> outputConsumerTwo = client.newConsumer(Schema.STRING)
+        
.subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe();
+
+int count = 2;
+// Step 5: produce messages to input topics.
+for (int i = 0; i < count; i++) {
+    inputProducer.send("Hello Pulsar! count : " + i);
+}
+
+// Step 5: consume messages and produce them to output topics with 
transactions.
+for (int i = 0; i < count; i++) {
+
+    // Step 5: the consumer successfully receives messages.
+    Message<String> message = inputConsumer.receive();
+
+    // Step 6: create transactions.
+    // The transaction timeout is specified as 10 seconds.
+    // If the transaction is not committed within 10 seconds, the transaction 
is automatically aborted.
+    Transaction txn = null;
+    try {
+        txn = client.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+        // Step 6: you can process the received message with your use case and 
business logic.
+
+        // Step 7: the producers produce messages to output topics with 
transactions
+        outputProducerOne.newMessage(txn).value("Hello Pulsar! outputTopicOne 
count : " + i).send();
+        outputProducerTwo.newMessage(txn).value("Hello Pulsar! outputTopicTwo 
count : " + i).send();
+
+        // Step 7: the consumers acknowledge the input message with the 
transactions *individually*.
+        inputConsumer.acknowledgeAsync(message.getMessageId(), txn).get();
+        // Step 8: commit transactions.
+        txn.commit().get();
+    } catch (ExecutionException e) {
+        if (!(e.getCause() instanceof 
PulsarClientException.TransactionConflictException)) {
+            // If TransactionConflictException is not thrown,
+            // you need to redeliver or negativeAcknowledge this message,
+            // or else this message will not be received again.
+            inputConsumer.negativeAcknowledge(message);
+        }
+
+        // If a new transaction is created,
+        // then the old transaction should be aborted.
+        if (txn != null) {
+            txn.abort();
         }
     }
-    ```
+}
+
+// Final result: consume messages from output topics and print them.
+for (int i = 0; i < count; i++) {
+    Message<String> message =  outputConsumerOne.receive();
+    System.out.println("Receive transaction message: " + message.getValue());
+}
+
+for (int i = 0; i < count; i++) {
+    Message<String> message =  outputConsumerTwo.receive();
+    System.out.println("Receive transaction message: " + message.getValue());
+}
+```
 
 </TabItem>
 <TabItem value="Go">
 
-  ```go
-       // Step 3: create a Pulsar client and enable transactions.
-       client, err := pulsar.NewClient(pulsar.ClientOptions{
-               URL:               "<serviceUrl>",
-               EnableTransaction: true,
-       })
-       if err != nil {
-               log.Fatalf("create client fail, err = %v", err)
-       }
-       defer client.Close()
-       // Step 4: create three producers to produce messages to input and 
output topics.
-       inputTopic := "inputTopic"
-       outputTopicOne := "outputTopicOne"
-       outputTopicTwo := "outputTopicTwo"
-       subscriptionName := "your-subscription-name"
-       inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
-               Topic:       inputTopic,
-               SendTimeout: 0,
-       })
-       defer inputProducer.Close()
-       outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
-               Topic:       outputTopicOne,
-               SendTimeout: 0,
-       })
-       defer outputProducerOne.Close()
-       outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
-               Topic:       outputTopicTwo,
-               SendTimeout: 0,
-       })
-       defer outputProducerTwo.Close()
-
-       // Step 4: create three consumers to consume messages from input and 
output topics.
-       inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            inputTopic,
-               SubscriptionName: subscriptionName,
-       })
-       defer inputConsumer.Close()
-       outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            outputTopicOne,
-               SubscriptionName: subscriptionName,
-       })
-       defer outputConsumerOne.Close()
-       outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            outputTopicTwo,
-               SubscriptionName: subscriptionName,
-       })
-       defer outputConsumerTwo.Close()
-
-       // Step 5: produce messages to input topics.
-       ctx := context.Background()
-       count := 2
-       for i := 0; i < count; i++ {
-               inputProducer.Send(ctx, &pulsar.ProducerMessage{
-                       Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", 
i)),
-               })
-       }
-       // Step 5: consume messages and produce them to output topics with 
transactions.
-       for i := 0; i < count; i++ {
-               // Step 5: the consumer successfully receives messages.
-               message, err := inputConsumer.Receive(ctx)
-               if err != nil {
-                       log.Printf("receive message from %s fail, err = %v", 
inputTopic, err)
-                       continue
-               }
-               // Step 6: create transactions.
-               // The transaction timeout is specified as 10 seconds.
-               // If the transaction is not committed within 10 seconds, the 
transaction is automatically aborted.
-               txn, err := client.NewTransaction(10 * time.Second)
-               if err != nil {
-                       log.Printf("create txn fail, err = %v", err)
-                       continue
-               }
-               // Step 6: you can process the received message with your use 
case and business logic.
-               // processMessage(message)
-               // Step 7: the producers produce messages to output topics with 
transactions
-               _, err = outputProducerOne.Send(context.Background(), 
&pulsar.ProducerMessage{
-                       Transaction: txn,
-                       Payload:     []byte(fmt.Sprintf("Hello Pulsar! 
outputTopicOne count : %d", i)),
-               })
-               if err != nil {
-                       log.Printf("send to producerOne fail %v", err)
-                       txn.Abort(ctx)
-               }
-               _, err = outputProducerTwo.Send(context.Background(), 
&pulsar.ProducerMessage{
-                       Transaction: txn,
-                       Payload:     []byte(fmt.Sprintf("Hello Pulsar! 
outputTopicTwo count : %d", i)),
-               })
-               if err != nil {
-                       log.Printf("send to producerTwo fail %v", err)
-                       txn.Abort(ctx)
-               }
-               // Step 7: the consumers acknowledge the input message with the 
transactions *individually*.
-               err = inputConsumer.AckWithTxn(message, txn)
-               if err != nil {
-                       log.Printf("ack message fail %v", err)
-                       txn.Abort(ctx)
-               }
-               // Step 8: commit transactions.
-               err = txn.Commit(ctx)
-               if err != nil {
-                       log.Printf("commit txn fail %v", err)
-               }
-       }
-
-       // Final result: consume messages from output topics and print them.
-       for i := 0; i < count; i++ {
-               message, _ := outputConsumerOne.Receive(ctx)
-               log.Printf("Receive transaction message: %s", 
string(message.Payload()))
-       }
-       for i := 0; i < count; i++ {
-               message, _ := outputConsumerTwo.Receive(ctx)
-               log.Printf("Receive transaction message: %s", 
string(message.Payload()))
-       }
-  ```
+```go
+// Step 3: create a Pulsar client and enable transactions.
+client, err := pulsar.NewClient(pulsar.ClientOptions{
+  URL:               "<serviceUrl>",
+  EnableTransaction: true,
+})
+if err != nil {
+  log.Fatalf("create client fail, err = %v", err)
+}
+defer client.Close()
+// Step 4: create three producers to produce messages to input and output 
topics.
+inputTopic := "inputTopic"
+outputTopicOne := "outputTopicOne"
+outputTopicTwo := "outputTopicTwo"
+subscriptionName := "your-subscription-name"
+inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
+  Topic:       inputTopic,
+  SendTimeout: 0,
+})
+defer inputProducer.Close()
+outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
+  Topic:       outputTopicOne,
+  SendTimeout: 0,
+})
+defer outputProducerOne.Close()
+outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
+  Topic:       outputTopicTwo,
+  SendTimeout: 0,
+})
+defer outputProducerTwo.Close()
+
+// Step 4: create three consumers to consume messages from input and output 
topics.
+inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
+  Topic:            inputTopic,
+  SubscriptionName: subscriptionName,
+})
+defer inputConsumer.Close()
+outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
+  Topic:            outputTopicOne,
+  SubscriptionName: subscriptionName,
+})
+defer outputConsumerOne.Close()
+outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
+  Topic:            outputTopicTwo,
+  SubscriptionName: subscriptionName,
+})
+defer outputConsumerTwo.Close()
+
+// Step 5: produce messages to input topics.
+ctx := context.Background()
+count := 2
+for i := 0; i < count; i++ {
+  inputProducer.Send(ctx, &pulsar.ProducerMessage{
+    Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", i)),
+  })
+}
+// Step 5: consume messages and produce them to output topics with 
transactions.
+for i := 0; i < count; i++ {
+  // Step 5: the consumer successfully receives messages.
+  message, err := inputConsumer.Receive(ctx)
+  if err != nil {
+    log.Printf("receive message from %s fail, err = %v", inputTopic, err)
+    continue
+  }
+  // Step 6: create transactions.
+  // The transaction timeout is specified as 10 seconds.
+  // If the transaction is not committed within 10 seconds, the transaction is 
automatically aborted.
+  txn, err := client.NewTransaction(10 * time.Second)
+  if err != nil {
+    log.Printf("create txn fail, err = %v", err)
+    continue
+  }
+  // Step 6: you can process the received message with your use case and 
business logic.
+  // processMessage(message)
+  // Step 7: the producers produce messages to output topics with transactions
+  _, err = outputProducerOne.Send(context.Background(), 
&pulsar.ProducerMessage{
+    Transaction: txn,
+    Payload:     []byte(fmt.Sprintf("Hello Pulsar! outputTopicOne count : %d", 
i)),
+  })
+  if err != nil {
+    log.Printf("send to producerOne fail %v", err)
+    txn.Abort(ctx)
+  }
+  _, err = outputProducerTwo.Send(context.Background(), 
&pulsar.ProducerMessage{
+    Transaction: txn,
+    Payload:     []byte(fmt.Sprintf("Hello Pulsar! outputTopicTwo count : %d", 
i)),
+  })
+  if err != nil {
+    log.Printf("send to producerTwo fail %v", err)
+    txn.Abort(ctx)
+  }
+  // Step 7: the consumers acknowledge the input message with the transactions 
*individually*.
+  err = inputConsumer.AckWithTxn(message, txn)
+  if err != nil {
+    log.Printf("ack message fail %v", err)
+    txn.Abort(ctx)
+  }
+  // Step 8: commit transactions.
+  err = txn.Commit(ctx)
+  if err != nil {
+    log.Printf("commit txn fail %v", err)
+  }
+}
+
+// Final result: consume messages from output topics and print them.
+for i := 0; i < count; i++ {
+  message, _ := outputConsumerOne.Receive(ctx)
+  log.Printf("Receive transaction message: %s", string(message.Payload()))
+}
+for i := 0; i < count; i++ {
+  message, _ := outputConsumerTwo.Receive(ctx)
+  log.Printf("Receive transaction message: %s", string(message.Payload()))
+}
+```
 
 </TabItem>
 </Tabs>

Reply via email to