This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git
The following commit(s) were added to refs/heads/main by this push:
new df8ff29a7d92 [improve][doc] add golang transaction demo in txn-use.md
(#948)
df8ff29a7d92 is described below
commit df8ff29a7d92ec539fe626a41b0c9277bbbae1b7
Author: zhou zhuohan <[email protected]>
AuthorDate: Mon Aug 12 18:49:51 2024 +0800
[improve][doc] add golang transaction demo in txn-use.md (#948)
Co-authored-by: ninjazhou <[email protected]>
---
docs/txn-use.md | 131 +++++++++++++++++++++++++++++++-
versioned_docs/version-3.0.x/txn-use.md | 131 +++++++++++++++++++++++++++++++-
versioned_docs/version-3.1.x/txn-use.md | 131 +++++++++++++++++++++++++++++++-
versioned_docs/version-3.2.x/txn-use.md | 131 +++++++++++++++++++++++++++++++-
versioned_docs/version-3.3.x/txn-use.md | 131 +++++++++++++++++++++++++++++++-
5 files changed, 650 insertions(+), 5 deletions(-)
diff --git a/docs/txn-use.md b/docs/txn-use.md
index 6d91283422a5..193a03de4dbb 100644
--- a/docs/txn-use.md
+++ b/docs/txn-use.md
@@ -54,7 +54,7 @@ To use Pulsar transaction API, complete the following steps.
Transaction coordinator metadata setup success
```
-3. Create a Pulsar client and enable transactions.
+3. Create a Pulsar client and enable transactions. Since client need to know
transaction coordinator from system topic, please make sure your client role
has system namespace `pulsar/system` produce/consume permissions.
4. Create producers and consumers.
@@ -80,6 +80,13 @@ To use Pulsar transaction API, complete the following steps.
**Input**
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Java"
+ values={[{"label":"Java","value":"Java"},{"label":"Go","value":"Go"}]}>
+
+<TabItem value="Java">
+
```java
PulsarClient client = PulsarClient.builder()
// Step 3: create a Pulsar client and enable transactions.
@@ -162,6 +169,128 @@ To use Pulsar transaction API, complete the following
steps.
}
```
+</TabItem>
+<TabItem value="Go">
+
+ ```go
+ // Step 3: create a Pulsar client and enable transactions.
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: "<serviceUrl>",
+ EnableTransaction: true,
+ })
+ if err != nil {
+ log.Fatalf("create client fail, err = %v", err)
+ }
+ defer client.Close()
+ // Step 4: create three producers to produce messages to input and
output topics.
+ inputTopic := "inputTopic"
+ outputTopicOne := "outputTopicOne"
+ outputTopicTwo := "outputTopicTwo"
+ subscriptionName := "your-subscription-name"
+ inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: inputTopic,
+ SendTimeout: 0,
+ })
+ defer inputProducer.Close()
+ outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: outputTopicOne,
+ SendTimeout: 0,
+ })
+ defer outputProducerOne.Close()
+ outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: outputTopicTwo,
+ SendTimeout: 0,
+ })
+ defer outputProducerTwo.Close()
+
+ // Step 4: create three consumers to consume messages from input and
output topics.
+ inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: inputTopic,
+ SubscriptionName: subscriptionName,
+ })
+ defer inputConsumer.Close()
+ outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: outputTopicOne,
+ SubscriptionName: subscriptionName,
+ })
+ defer outputConsumerOne.Close()
+ outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: outputTopicTwo,
+ SubscriptionName: subscriptionName,
+ })
+ defer outputConsumerTwo.Close()
+
+ // Step 5: produce messages to input topics.
+ ctx := context.Background()
+ count := 2
+ for i := 0; i < count; i++ {
+ inputProducer.Send(ctx, &pulsar.ProducerMessage{
+ Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d",
i)),
+ })
+ }
+ // Step 5: consume messages and produce them to output topics with
transactions.
+ for i := 0; i < count; i++ {
+ // Step 5: the consumer successfully receives messages.
+ message, err := inputConsumer.Receive(ctx)
+ if err != nil {
+ log.Printf("receive message from %s fail, err = %v",
inputTopic, err)
+ continue
+ }
+ // Step 6: create transactions.
+ // The transaction timeout is specified as 10 seconds.
+ // If the transaction is not committed within 10 seconds, the
transaction is automatically aborted.
+ txn, err := client.NewTransaction(10 * time.Second)
+ if err != nil {
+ log.Printf("create txn fail, err = %v", err)
+ continue
+ }
+ // Step 6: you can process the received message with your use
case and business logic.
+ // processMessage(message)
+ // Step 7: the producers produce messages to output topics with
transactions
+ _, err = outputProducerOne.Send(context.Background(),
&pulsar.ProducerMessage{
+ Transaction: txn,
+ Payload: []byte(fmt.Sprintf("Hello Pulsar!
outputTopicOne count : %d", i)),
+ })
+ if err != nil {
+ log.Printf("send to producerOne fail %v", err)
+ txn.Abort(ctx)
+ }
+ _, err = outputProducerTwo.Send(context.Background(),
&pulsar.ProducerMessage{
+ Transaction: txn,
+ Payload: []byte(fmt.Sprintf("Hello Pulsar!
outputTopicTwo count : %d", i)),
+ })
+ if err != nil {
+ log.Printf("send to producerTwo fail %v", err)
+ txn.Abort(ctx)
+ }
+ // Step 7: the consumers acknowledge the input message with the
transactions *individually*.
+ err = inputConsumer.AckWithTxn(message, txn)
+ if err != nil {
+ log.Printf("ack message fail %v", err)
+ txn.Abort(ctx)
+ }
+ // Step 8: commit transactions.
+ err = txn.Commit(ctx)
+ if err != nil {
+ log.Printf("commit txn fail %v", err)
+ }
+ }
+
+ // Final result: consume messages from output topics and print them.
+ for i := 0; i < count; i++ {
+ message, _ := outputConsumerOne.Receive(ctx)
+ log.Printf("Receive transaction message: %s",
string(message.Payload()))
+ }
+ for i := 0; i < count; i++ {
+ message, _ := outputConsumerTwo.Receive(ctx)
+ log.Printf("Receive transaction message: %s",
string(message.Payload()))
+ }
+ ```
+
+</TabItem>
+</Tabs>
+````
+
**Output**
```java
diff --git a/versioned_docs/version-3.0.x/txn-use.md
b/versioned_docs/version-3.0.x/txn-use.md
index 9b19bc7ef542..6886ea8a83ca 100644
--- a/versioned_docs/version-3.0.x/txn-use.md
+++ b/versioned_docs/version-3.0.x/txn-use.md
@@ -49,7 +49,7 @@ Currently, [Pulsar transaction API](/api/admin/) is available
in **Pulsar 2.8.0
Transaction coordinator metadata setup success
```
-3. Create a Pulsar client and enable transactions.
+3. Create a Pulsar client and enable transactions. Since client need to know
transaction coordinator from system topic, please make sure your client role
has system namespace `pulsar/system` produce/consume permissions.
4. Create producers and consumers.
@@ -75,6 +75,13 @@ Currently, [Pulsar transaction API](/api/admin/) is
available in **Pulsar 2.8.0
**Input**
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Java"
+ values={[{"label":"Java","value":"Java"},{"label":"Go","value":"Go"}]}>
+
+<TabItem value="Java">
+
```java
PulsarClient client = PulsarClient.builder()
// Step 3: create a Pulsar client and enable transactions.
@@ -157,6 +164,128 @@ Currently, [Pulsar transaction API](/api/admin/) is
available in **Pulsar 2.8.0
}
```
+</TabItem>
+<TabItem value="Go">
+
+ ```go
+ // Step 3: create a Pulsar client and enable transactions.
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: "<serviceUrl>",
+ EnableTransaction: true,
+ })
+ if err != nil {
+ log.Fatalf("create client fail, err = %v", err)
+ }
+ defer client.Close()
+ // Step 4: create three producers to produce messages to input and
output topics.
+ inputTopic := "inputTopic"
+ outputTopicOne := "outputTopicOne"
+ outputTopicTwo := "outputTopicTwo"
+ subscriptionName := "your-subscription-name"
+ inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: inputTopic,
+ SendTimeout: 0,
+ })
+ defer inputProducer.Close()
+ outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: outputTopicOne,
+ SendTimeout: 0,
+ })
+ defer outputProducerOne.Close()
+ outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: outputTopicTwo,
+ SendTimeout: 0,
+ })
+ defer outputProducerTwo.Close()
+
+ // Step 4: create three consumers to consume messages from input and
output topics.
+ inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: inputTopic,
+ SubscriptionName: subscriptionName,
+ })
+ defer inputConsumer.Close()
+ outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: outputTopicOne,
+ SubscriptionName: subscriptionName,
+ })
+ defer outputConsumerOne.Close()
+ outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: outputTopicTwo,
+ SubscriptionName: subscriptionName,
+ })
+ defer outputConsumerTwo.Close()
+
+ // Step 5: produce messages to input topics.
+ ctx := context.Background()
+ count := 2
+ for i := 0; i < count; i++ {
+ inputProducer.Send(ctx, &pulsar.ProducerMessage{
+ Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d",
i)),
+ })
+ }
+ // Step 5: consume messages and produce them to output topics with
transactions.
+ for i := 0; i < count; i++ {
+ // Step 5: the consumer successfully receives messages.
+ message, err := inputConsumer.Receive(ctx)
+ if err != nil {
+ log.Printf("receive message from %s fail, err = %v",
inputTopic, err)
+ continue
+ }
+ // Step 6: create transactions.
+ // The transaction timeout is specified as 10 seconds.
+ // If the transaction is not committed within 10 seconds, the
transaction is automatically aborted.
+ txn, err := client.NewTransaction(10 * time.Second)
+ if err != nil {
+ log.Printf("create txn fail, err = %v", err)
+ continue
+ }
+ // Step 6: you can process the received message with your use
case and business logic.
+ // processMessage(message)
+ // Step 7: the producers produce messages to output topics with
transactions
+ _, err = outputProducerOne.Send(context.Background(),
&pulsar.ProducerMessage{
+ Transaction: txn,
+ Payload: []byte(fmt.Sprintf("Hello Pulsar!
outputTopicOne count : %d", i)),
+ })
+ if err != nil {
+ log.Printf("send to producerOne fail %v", err)
+ txn.Abort(ctx)
+ }
+ _, err = outputProducerTwo.Send(context.Background(),
&pulsar.ProducerMessage{
+ Transaction: txn,
+ Payload: []byte(fmt.Sprintf("Hello Pulsar!
outputTopicTwo count : %d", i)),
+ })
+ if err != nil {
+ log.Printf("send to producerTwo fail %v", err)
+ txn.Abort(ctx)
+ }
+ // Step 7: the consumers acknowledge the input message with the
transactions *individually*.
+ err = inputConsumer.AckWithTxn(message, txn)
+ if err != nil {
+ log.Printf("ack message fail %v", err)
+ txn.Abort(ctx)
+ }
+ // Step 8: commit transactions.
+ err = txn.Commit(ctx)
+ if err != nil {
+ log.Printf("commit txn fail %v", err)
+ }
+ }
+
+ // Final result: consume messages from output topics and print them.
+ for i := 0; i < count; i++ {
+ message, _ := outputConsumerOne.Receive(ctx)
+ log.Printf("Receive transaction message: %s",
string(message.Payload()))
+ }
+ for i := 0; i < count; i++ {
+ message, _ := outputConsumerTwo.Receive(ctx)
+ log.Printf("Receive transaction message: %s",
string(message.Payload()))
+ }
+ ```
+
+</TabItem>
+</Tabs>
+````
+
**Output**
```java
diff --git a/versioned_docs/version-3.1.x/txn-use.md
b/versioned_docs/version-3.1.x/txn-use.md
index 9b19bc7ef542..48cc5a840aaa 100644
--- a/versioned_docs/version-3.1.x/txn-use.md
+++ b/versioned_docs/version-3.1.x/txn-use.md
@@ -49,7 +49,7 @@ Currently, [Pulsar transaction API](/api/admin/) is available
in **Pulsar 2.8.0
Transaction coordinator metadata setup success
```
-3. Create a Pulsar client and enable transactions.
+3. Create a Pulsar client and enable transactions. Since client need to know
transaction coordinator from system topic, please make sure your client role
has system namespace `pulsar/system` produce/consume permissions.
4. Create producers and consumers.
@@ -75,6 +75,13 @@ Currently, [Pulsar transaction API](/api/admin/) is
available in **Pulsar 2.8.0
**Input**
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Java"
+ values={[{"label":"Java","value":"Java"},{"label":"Go","value":"Go"}]}>
+
+<TabItem value="Java">
+
```java
PulsarClient client = PulsarClient.builder()
// Step 3: create a Pulsar client and enable transactions.
@@ -157,6 +164,128 @@ Currently, [Pulsar transaction API](/api/admin/) is
available in **Pulsar 2.8.0
}
```
+</TabItem>
+<TabItem value="Go">
+
+ ```go
+ // Step 3: create a Pulsar client and enable transactions.
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: "<serviceUrl>",
+ EnableTransaction: true,
+ })
+ if err != nil {
+ log.Fatalf("create client fail, err = %v", err)
+ }
+ defer client.Close()
+ // Step 4: create three producers to produce messages to input and
output topics.
+ inputTopic := "inputTopic"
+ outputTopicOne := "outputTopicOne"
+ outputTopicTwo := "outputTopicTwo"
+ subscriptionName := "your-subscription-name"
+ inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: inputTopic,
+ SendTimeout: 0,
+ })
+ defer inputProducer.Close()
+ outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: outputTopicOne,
+ SendTimeout: 0,
+ })
+ defer outputProducerOne.Close()
+ outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: outputTopicTwo,
+ SendTimeout: 0,
+ })
+ defer outputProducerTwo.Close()
+
+ // Step 4: create three consumers to consume messages from input and
output topics.
+ inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: inputTopic,
+ SubscriptionName: subscriptionName,
+ })
+ defer inputConsumer.Close()
+ outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: outputTopicOne,
+ SubscriptionName: subscriptionName,
+ })
+ defer outputConsumerOne.Close()
+ outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: outputTopicTwo,
+ SubscriptionName: subscriptionName,
+ })
+ defer outputConsumerTwo.Close()
+
+ // Step 5: produce messages to input topics.
+ ctx := context.Background()
+ count := 2
+ for i := 0; i < count; i++ {
+ inputProducer.Send(ctx, &pulsar.ProducerMessage{
+ Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d",
i)),
+ })
+ }
+ // Step 5: consume messages and produce them to output topics with
transactions.
+ for i := 0; i < count; i++ {
+ // Step 5: the consumer successfully receives messages.
+ message, err := inputConsumer.Receive(ctx)
+ if err != nil {
+ log.Printf("receive message from %s fail, err = %v",
inputTopic, err)
+ continue
+ }
+ // Step 6: create transactions.
+ // The transaction timeout is specified as 10 seconds.
+ // If the transaction is not committed within 10 seconds, the
transaction is automatically aborted.
+ txn, err := client.NewTransaction(10 * time.Second)
+ if err != nil {
+ log.Printf("create txn fail, err = %v", err)
+ continue
+ }
+ // Step 6: you can process the received message with your use
case and business logic.
+ // processMessage(message)
+ // Step 7: the producers produce messages to output topics with
transactions
+ _, err = outputProducerOne.Send(context.Background(),
&pulsar.ProducerMessage{
+ Transaction: txn,
+ Payload: []byte(fmt.Sprintf("Hello Pulsar!
outputTopicOne count : %d", i)),
+ })
+ if err != nil {
+ log.Printf("send to producerOne fail %v", err)
+ txn.Abort(ctx)
+ }
+ _, err = outputProducerTwo.Send(context.Background(),
&pulsar.ProducerMessage{
+ Transaction: txn,
+ Payload: []byte(fmt.Sprintf("Hello Pulsar!
outputTopicTwo count : %d", i)),
+ })
+ if err != nil {
+ log.Printf("send to producerTwo fail %v", err)
+ txn.Abort(ctx)
+ }
+ // Step 7: the consumers acknowledge the input message with the
transactions *individually*.
+ err = inputConsumer.AckWithTxn(message, txn)
+ if err != nil {
+ log.Printf("ack message fail %v", err)
+ txn.Abort(ctx)
+ }
+ // Step 8: commit transactions.
+ err = txn.Commit(ctx)
+ if err != nil {
+ log.Printf("commit txn fail %v", err)
+ }
+ }
+
+ // Final result: consume messages from output topics and print them.
+ for i := 0; i < count; i++ {
+ message, _ := outputConsumerOne.Receive(ctx)
+ log.Printf("Receive transaction message: %s",
string(message.Payload()))
+ }
+ for i := 0; i < count; i++ {
+ message, _ := outputConsumerTwo.Receive(ctx)
+ log.Printf("Receive transaction message: %s",
string(message.Payload()))
+ }
+ ```
+
+</TabItem>
+</Tabs>
+````
+
**Output**
```java
diff --git a/versioned_docs/version-3.2.x/txn-use.md
b/versioned_docs/version-3.2.x/txn-use.md
index 6d91283422a5..193a03de4dbb 100644
--- a/versioned_docs/version-3.2.x/txn-use.md
+++ b/versioned_docs/version-3.2.x/txn-use.md
@@ -54,7 +54,7 @@ To use Pulsar transaction API, complete the following steps.
Transaction coordinator metadata setup success
```
-3. Create a Pulsar client and enable transactions.
+3. Create a Pulsar client and enable transactions. Since client need to know
transaction coordinator from system topic, please make sure your client role
has system namespace `pulsar/system` produce/consume permissions.
4. Create producers and consumers.
@@ -80,6 +80,13 @@ To use Pulsar transaction API, complete the following steps.
**Input**
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Java"
+ values={[{"label":"Java","value":"Java"},{"label":"Go","value":"Go"}]}>
+
+<TabItem value="Java">
+
```java
PulsarClient client = PulsarClient.builder()
// Step 3: create a Pulsar client and enable transactions.
@@ -162,6 +169,128 @@ To use Pulsar transaction API, complete the following
steps.
}
```
+</TabItem>
+<TabItem value="Go">
+
+ ```go
+ // Step 3: create a Pulsar client and enable transactions.
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: "<serviceUrl>",
+ EnableTransaction: true,
+ })
+ if err != nil {
+ log.Fatalf("create client fail, err = %v", err)
+ }
+ defer client.Close()
+ // Step 4: create three producers to produce messages to input and
output topics.
+ inputTopic := "inputTopic"
+ outputTopicOne := "outputTopicOne"
+ outputTopicTwo := "outputTopicTwo"
+ subscriptionName := "your-subscription-name"
+ inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: inputTopic,
+ SendTimeout: 0,
+ })
+ defer inputProducer.Close()
+ outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: outputTopicOne,
+ SendTimeout: 0,
+ })
+ defer outputProducerOne.Close()
+ outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: outputTopicTwo,
+ SendTimeout: 0,
+ })
+ defer outputProducerTwo.Close()
+
+ // Step 4: create three consumers to consume messages from input and
output topics.
+ inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: inputTopic,
+ SubscriptionName: subscriptionName,
+ })
+ defer inputConsumer.Close()
+ outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: outputTopicOne,
+ SubscriptionName: subscriptionName,
+ })
+ defer outputConsumerOne.Close()
+ outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: outputTopicTwo,
+ SubscriptionName: subscriptionName,
+ })
+ defer outputConsumerTwo.Close()
+
+ // Step 5: produce messages to input topics.
+ ctx := context.Background()
+ count := 2
+ for i := 0; i < count; i++ {
+ inputProducer.Send(ctx, &pulsar.ProducerMessage{
+ Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d",
i)),
+ })
+ }
+ // Step 5: consume messages and produce them to output topics with
transactions.
+ for i := 0; i < count; i++ {
+ // Step 5: the consumer successfully receives messages.
+ message, err := inputConsumer.Receive(ctx)
+ if err != nil {
+ log.Printf("receive message from %s fail, err = %v",
inputTopic, err)
+ continue
+ }
+ // Step 6: create transactions.
+ // The transaction timeout is specified as 10 seconds.
+ // If the transaction is not committed within 10 seconds, the
transaction is automatically aborted.
+ txn, err := client.NewTransaction(10 * time.Second)
+ if err != nil {
+ log.Printf("create txn fail, err = %v", err)
+ continue
+ }
+ // Step 6: you can process the received message with your use
case and business logic.
+ // processMessage(message)
+ // Step 7: the producers produce messages to output topics with
transactions
+ _, err = outputProducerOne.Send(context.Background(),
&pulsar.ProducerMessage{
+ Transaction: txn,
+ Payload: []byte(fmt.Sprintf("Hello Pulsar!
outputTopicOne count : %d", i)),
+ })
+ if err != nil {
+ log.Printf("send to producerOne fail %v", err)
+ txn.Abort(ctx)
+ }
+ _, err = outputProducerTwo.Send(context.Background(),
&pulsar.ProducerMessage{
+ Transaction: txn,
+ Payload: []byte(fmt.Sprintf("Hello Pulsar!
outputTopicTwo count : %d", i)),
+ })
+ if err != nil {
+ log.Printf("send to producerTwo fail %v", err)
+ txn.Abort(ctx)
+ }
+ // Step 7: the consumers acknowledge the input message with the
transactions *individually*.
+ err = inputConsumer.AckWithTxn(message, txn)
+ if err != nil {
+ log.Printf("ack message fail %v", err)
+ txn.Abort(ctx)
+ }
+ // Step 8: commit transactions.
+ err = txn.Commit(ctx)
+ if err != nil {
+ log.Printf("commit txn fail %v", err)
+ }
+ }
+
+ // Final result: consume messages from output topics and print them.
+ for i := 0; i < count; i++ {
+ message, _ := outputConsumerOne.Receive(ctx)
+ log.Printf("Receive transaction message: %s",
string(message.Payload()))
+ }
+ for i := 0; i < count; i++ {
+ message, _ := outputConsumerTwo.Receive(ctx)
+ log.Printf("Receive transaction message: %s",
string(message.Payload()))
+ }
+ ```
+
+</TabItem>
+</Tabs>
+````
+
**Output**
```java
diff --git a/versioned_docs/version-3.3.x/txn-use.md
b/versioned_docs/version-3.3.x/txn-use.md
index 6d91283422a5..193a03de4dbb 100644
--- a/versioned_docs/version-3.3.x/txn-use.md
+++ b/versioned_docs/version-3.3.x/txn-use.md
@@ -54,7 +54,7 @@ To use Pulsar transaction API, complete the following steps.
Transaction coordinator metadata setup success
```
-3. Create a Pulsar client and enable transactions.
+3. Create a Pulsar client and enable transactions. Since client need to know
transaction coordinator from system topic, please make sure your client role
has system namespace `pulsar/system` produce/consume permissions.
4. Create producers and consumers.
@@ -80,6 +80,13 @@ To use Pulsar transaction API, complete the following steps.
**Input**
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Java"
+ values={[{"label":"Java","value":"Java"},{"label":"Go","value":"Go"}]}>
+
+<TabItem value="Java">
+
```java
PulsarClient client = PulsarClient.builder()
// Step 3: create a Pulsar client and enable transactions.
@@ -162,6 +169,128 @@ To use Pulsar transaction API, complete the following
steps.
}
```
+</TabItem>
+<TabItem value="Go">
+
+ ```go
+ // Step 3: create a Pulsar client and enable transactions.
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: "<serviceUrl>",
+ EnableTransaction: true,
+ })
+ if err != nil {
+ log.Fatalf("create client fail, err = %v", err)
+ }
+ defer client.Close()
+ // Step 4: create three producers to produce messages to input and
output topics.
+ inputTopic := "inputTopic"
+ outputTopicOne := "outputTopicOne"
+ outputTopicTwo := "outputTopicTwo"
+ subscriptionName := "your-subscription-name"
+ inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: inputTopic,
+ SendTimeout: 0,
+ })
+ defer inputProducer.Close()
+ outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: outputTopicOne,
+ SendTimeout: 0,
+ })
+ defer outputProducerOne.Close()
+ outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: outputTopicTwo,
+ SendTimeout: 0,
+ })
+ defer outputProducerTwo.Close()
+
+ // Step 4: create three consumers to consume messages from input and
output topics.
+ inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: inputTopic,
+ SubscriptionName: subscriptionName,
+ })
+ defer inputConsumer.Close()
+ outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: outputTopicOne,
+ SubscriptionName: subscriptionName,
+ })
+ defer outputConsumerOne.Close()
+ outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: outputTopicTwo,
+ SubscriptionName: subscriptionName,
+ })
+ defer outputConsumerTwo.Close()
+
+ // Step 5: produce messages to input topics.
+ ctx := context.Background()
+ count := 2
+ for i := 0; i < count; i++ {
+ inputProducer.Send(ctx, &pulsar.ProducerMessage{
+ Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d",
i)),
+ })
+ }
+ // Step 5: consume messages and produce them to output topics with
transactions.
+ for i := 0; i < count; i++ {
+ // Step 5: the consumer successfully receives messages.
+ message, err := inputConsumer.Receive(ctx)
+ if err != nil {
+ log.Printf("receive message from %s fail, err = %v",
inputTopic, err)
+ continue
+ }
+ // Step 6: create transactions.
+ // The transaction timeout is specified as 10 seconds.
+ // If the transaction is not committed within 10 seconds, the
transaction is automatically aborted.
+ txn, err := client.NewTransaction(10 * time.Second)
+ if err != nil {
+ log.Printf("create txn fail, err = %v", err)
+ continue
+ }
+ // Step 6: you can process the received message with your use
case and business logic.
+ // processMessage(message)
+ // Step 7: the producers produce messages to output topics with
transactions
+ _, err = outputProducerOne.Send(context.Background(),
&pulsar.ProducerMessage{
+ Transaction: txn,
+ Payload: []byte(fmt.Sprintf("Hello Pulsar!
outputTopicOne count : %d", i)),
+ })
+ if err != nil {
+ log.Printf("send to producerOne fail %v", err)
+ txn.Abort(ctx)
+ }
+ _, err = outputProducerTwo.Send(context.Background(),
&pulsar.ProducerMessage{
+ Transaction: txn,
+ Payload: []byte(fmt.Sprintf("Hello Pulsar!
outputTopicTwo count : %d", i)),
+ })
+ if err != nil {
+ log.Printf("send to producerTwo fail %v", err)
+ txn.Abort(ctx)
+ }
+ // Step 7: the consumers acknowledge the input message with the
transactions *individually*.
+ err = inputConsumer.AckWithTxn(message, txn)
+ if err != nil {
+ log.Printf("ack message fail %v", err)
+ txn.Abort(ctx)
+ }
+ // Step 8: commit transactions.
+ err = txn.Commit(ctx)
+ if err != nil {
+ log.Printf("commit txn fail %v", err)
+ }
+ }
+
+ // Final result: consume messages from output topics and print them.
+ for i := 0; i < count; i++ {
+ message, _ := outputConsumerOne.Receive(ctx)
+ log.Printf("Receive transaction message: %s",
string(message.Payload()))
+ }
+ for i := 0; i < count; i++ {
+ message, _ := outputConsumerTwo.Receive(ctx)
+ log.Printf("Receive transaction message: %s",
string(message.Payload()))
+ }
+ ```
+
+</TabItem>
+</Tabs>
+````
+
**Output**
```java