This is an automated email from the ASF dual-hosted git repository.
liuyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git
The following commit(s) were added to refs/heads/main by this push:
new dfaf6f311f0 [feat][doc] Add Get Started for TXN (#501)
dfaf6f311f0 is described below
commit dfaf6f311f026a09cb5263401238ab36e5058ee8
Author: Anonymitaet <[email protected]>
AuthorDate: Fri Apr 7 12:20:15 2023 +0800
[feat][doc] Add Get Started for TXN (#501)
---
docs/txn-advanced-features.md | 45 +++++++
docs/txn-how.md | 2 +-
docs/txn-monitor.md | 2 +-
docs/txn-use.md | 290 +++++++++++++++++++++++-------------------
docs/txn-what.md | 2 +-
sidebars.json | 5 +-
6 files changed, 207 insertions(+), 139 deletions(-)
diff --git a/docs/txn-advanced-features.md b/docs/txn-advanced-features.md
new file mode 100644
index 00000000000..c2f5c6f26b7
--- /dev/null
+++ b/docs/txn-advanced-features.md
@@ -0,0 +1,45 @@
+---
+id: txn-advanced-features
+title: Advanced features
+sidebar_label: "Advanced features"
+---
+
+You can use the following advanced features with transactions in Pulsar.
+
+## Ack batch messages
+
+If you want to acknowledge batch messages with transactions, set
`acknowledgmentAtBatchIndexLevelEnabled` to `true` in the
[`broker.conf`](https://github.com/apache/pulsar/blob/master/conf/broker.conf)
or
[`standalone.conf`](https://github.com/apache/pulsar/blob/master/conf/standalone.conf)
file.
+
+
+```conf
+acknowledgmentAtBatchIndexLevelEnabled=true
+```
+
+This example enables batch messages ack in transactions in the consumer
builder.
+
+```java
+Consumer<byte[]> consumer = pulsarClient
+ .newConsumer()
+ .topic(transferTopic)
+ .subscriptionName("transaction-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionType(SubscriptionType.Shared)
+ .enableBatchIndexAcknowledgment(true) // enable batch index acknowledgment
+ .subscribe();
+```
+
+## Enable authentication
+
+If you want to enable authentication with transactions, follow the steps below.
+
+1. [Grant "consume" permission](admin-api-topics.md#grant-permission) to the
`persistent://pulsar/system/transaction_coordinator_assign` topic.
+
+2. [Configure authentication](security-overview/#authentication) in a Pulsar
client.
+
+## Guarantee exactly-once semantics
+
+If you want to guarantee exactly-once semantics with transactions, you can
[enable message deduplication at the broker, namespace, or topic
level](cookbooks-deduplication.md#enable-message-deduplication-at-namespace-or-topic-level).
+
+## Related topics
+
+- To get up quickly, see [Pulsar transactions - Get started](txn-use.md).
\ No newline at end of file
diff --git a/docs/txn-how.md b/docs/txn-how.md
index b34020cf426..feae8c4cb01 100644
--- a/docs/txn-how.md
+++ b/docs/txn-how.md
@@ -1,7 +1,7 @@
---
id: txn-how
title: How transactions work?
-sidebar_label: "How transactions work?"
+sidebar_label: "Working principles"
---
This section describes transaction components and how the components work
together. For the complete design details, see [PIP-31: Transactional
Streaming](https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx).
diff --git a/docs/txn-monitor.md b/docs/txn-monitor.md
index f317db80adb..a9e2dd38dca 100644
--- a/docs/txn-monitor.md
+++ b/docs/txn-monitor.md
@@ -1,7 +1,7 @@
---
id: txn-monitor
title: How to monitor transactions?
-sidebar_label: "How to monitor transactions?"
+sidebar_label: "Monitoring"
---
You can monitor the status of the transactions in Prometheus and Grafana using
the [transaction metrics](/reference-metrics.md#pulsar-transaction).
diff --git a/docs/txn-use.md b/docs/txn-use.md
index 69c7590914c..08f5baa163c 100644
--- a/docs/txn-use.md
+++ b/docs/txn-use.md
@@ -1,148 +1,170 @@
---
id: txn-use
-title: How to use transactions?
-sidebar_label: "How to use transactions?"
+title: Get started
+sidebar_label: "Get started"
---
-## Transaction API
+Pulsar transaction is primarily a server-side and protocol-level feature. This
tutorial guides you through every step of how to use the [Pulsar transaction
API](/api/admin/) to send and receive messages in a Java client.
-The transaction feature is primarily a server-side and protocol-level feature.
You can use the transaction feature via the [transaction API](/api/admin/),
which is available in **Pulsar 2.8.0 or later**.
+:::note
-To use the transaction API, you do not need any additional settings in the
Pulsar client. **By default**, transactions are **disabled**.
+Currently, [Pulsar transaction API](/api/admin/) is available in **Pulsar
2.8.0 or later** versions. It is only available for **Java** clients.
-Currently, transaction API is only available for **Java** clients. Support for
other language clients will be added in future releases.
+:::
+## Prerequisites
-## Quick start
+- [Start Pulsar 2.8.0 or later versions](#getting-started-standalone.md)
-This section provides an example of how to use the transaction API to send and
receive messages in a Java client.
+## Steps
-1. Start Pulsar 2.8.0 or later.
+1. Enable transactions. You can set the following configurations in the
[`broker.conf`](https://github.com/apache/pulsar/blob/master/conf/broker.conf)
or
[`standalone.conf`](https://github.com/apache/pulsar/blob/master/conf/standalone.conf)
file.
-2. Enable transaction.
+ ```conf
+ //mandatory configuration, used to enable transaction coordinator
+ transactionCoordinatorEnabled=true
- Change the configuration in the `broker.conf` or `standalone.conf` file.
+ //mandatory configuration, used to create systemTopic used for transaction
buffer snapshot
+ systemTopicEnabled=true
+ ```
- ```conf
- //mandatory configuration, used to enable transaction coordinator
- transactionCoordinatorEnabled=true
+ :::note
- //mandtory configuration, used to create systemTopic used for transaction
buffer snapshot
- systemTopicEnabled=true
- ```
-
- * If you want to acknowledge batch messages in transactions, set
`acknowledgmentAtBatchIndexLevelEnabled` to `true` in the `broker.conf` or
`standalone.conf` file.
-
- ```conf
- acknowledgmentAtBatchIndexLevelEnabled=true
- ```
-
- * If you want to guarantee exactly-once semantics, you need to enable
[message deduplication](cookbooks-deduplication.md).
- You can enable message deduplication at the broker level, the namespace
level, or the topic level according to your needs.
-
-
-3. Initialize transaction coordinator metadata.
-
- The transaction coordinator can leverage the advantages of partitioned
topics (such as load balance).
-
- **Input**
-
- ```shell
- bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181
-c standalone
- ```
-
- **Output**
-
- ```shell
- Transaction coordinator metadata setup success
- ```
-
-4. Initialize a Pulsar client.
-
- ```shell
- PulsarClient client = PulsarClient.builder()
- .serviceUrl("pulsar://localhost:6650")
- .enableTransaction(true)
- .build();
- ```
-
-Now you can start using the transaction API to send and receive messages.
Below is an example of a `consume-process-produce` application written in Java.
-
-
-
-Let's walk through this example step by step.
-
-| Step | Description |
-| --- | --- |
-| 1. Start a transaction. | The application opens a new transaction by
calling PulsarClient.newTransaction. It specifics the transaction timeout as 1
minute. If the transaction is not committed within 1 minute, the transaction is
automatically aborted. |
-| 2. Receive messages from topics. | The application creates two normal
consumers to receive messages from topic input-topic-1 and input-topic-2
respectively. |
-| 3. Publish messages to topics with the transaction. | The application
creates two producers to produce the resulting messages to the output topic
_output-topic-1_ and output-topic-2 respectively. The application applies the
processing logic and generates two output messages. The application sends those
two output messages as part of the transaction opened in the first step via
`Producer.newMessage(Transaction)`. |
-| 4. Acknowledge the messages with the transaction. | In the same
transaction, the application acknowledges the two input messages. |
-| 5. Commit the transaction. | The application commits the transaction by
calling `Transaction.commit()` on the open transaction. The commit operation
ensures the two input messages are marked as acknowledged and the two output
messages are written successfully to the output topics. |
-
-[1] Example of enabling batch messages ack in transactions in the consumer
builder.
-
-```java
-Consumer<byte[]> consumer = pulsarClient
- .newConsumer()
- .topic(transferTopic)
- .subscriptionName("transaction-sub")
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscriptionType(SubscriptionType.Shared)
- .enableBatchIndexAcknowledgment(true) // enable batch index acknowledgment
- .subscribe();
-```
-
-[2] Example of using transactions to ack messages individually
-```java
-// resource prepare
-String sourceTopicName = "persistent://" + NAMESPACE1 + "/sourceTopic";
-String sinkTopicName = "persistent://" + NAMESPACE1 + "/sinkTopic";
-String subName = "shared-subscription";
-String producerName = "txn-message-producer";
-try {
- @Cleanup
- Producer<String> sinkProducer = pulsarClient.newProducer(Schema.STRING)
- .topic(sinkTopicName)
- .producerName(producerName)
- .create();
- @Cleanup
- Consumer<String> sourceConsumer = pulsarClient.newConsumer(Schema.STRING)
- .topic(sourceTopicName)
- .subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
- Message<String> message = null;
- Transaction transaction = null;
- while (true) {
- try {
- message = sourceConsumer.receive();
- //Open a transaction to handle the received message
- transaction = pulsarClient.newTransaction()
- .withTransactionTimeout(5, TimeUnit.SECONDS)
- .build()
- .get();
- //Do some things there
- //Send message to another topic
- sinkProducer.newMessage(transaction)
- .value("handle message " + message.getValue())
- .send();
- //Ack the message that has been consumed
- sourceConsumer.acknowledgeAsync(message.getMessageId(),
transaction).get();
- //Commit the transaction
- transaction.commit().get();
- } catch (ExecutionException e) {
- Throwable exception = e.getCause();
- if (!(exception instanceof
PulsarClientException.TransactionConflictException)) {
- //The message may not be handled, so we need to redeliver it
- sourceConsumer.negativeAcknowledge(message);
- }
- if (!(exception instanceof
TransactionCoordinatorClientException.TransactionNotFoundException) &&
transaction !=null) {
- //Abort the transaction if there is an exception and the
transaction is not end.
- transaction.abort().get();
- }
- }
- }
-} catch (Exception e) {
- log.error("Catch Exception", e);
-}
-```
+ **By default**, Pulsar transactions are **disabled**.
+
+ :::
+
+2. Initialize transaction coordinator metadata.
+
+ The transaction coordinator can leverage the advantages of partitioned
topics (such as load balance).
+
+ **Input**
+
+ ```shell
+ bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181
-c standalone
+ ```
+
+ **Output**
+
+ ```shell
+ Transaction coordinator metadata setup success
+ ```
+
+3. Create a Pulsar client and enable transactions.
+
+4. Create producers and consumers.
+
+5. Produce and receive messages.
+
+6. Create transactions.
+
+7. Produce and ack messages with transactions.
+
+ :::note
+
+ Currently, messages can be acked individually rather than cumulatively.
+
+ :::
+
+8. End transactions.
+
+ :::tip
+
+ The code snippet below is the example for step 3 - step 8.
+
+ :::
+
+ **Input**
+
+ ```java
+ PulsarClient client = PulsarClient.builder()
+ // Step 3: create a Pulsar client and enable transactions.
+ .enableTransaction(true)
+ .serviceUrl(jct.serviceUrl)
+
+ // Step 4: create three producers to produce messages to input and
output topics.
+ ProducerBuilder<String> producerBuilder =
client.newProducer(Schema.STRING);
+ Producer<String> inputProducer = producerBuilder.topic(inputTopic)
+ .sendTimeout(0, TimeUnit.SECONDS).create();
+ Producer<String> outputProducerOne =
producerBuilder.topic(outputTopicOne)
+ .sendTimeout(0, TimeUnit.SECONDS).create();
+ Producer<String> outputProducerTwo =
producerBuilder.topic(outputTopicTwo)
+ .sendTimeout(0, TimeUnit.SECONDS).create();
+ // Step 4: create three consumers to consume messages from input
and output topics.
+ Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
+
.subscriptionName("your-subscription-name").topic(inputTopic).subscribe();
+ Consumer<String> outputConsumerOne =
client.newConsumer(Schema.STRING)
+
.subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe();
+ Consumer<String> outputConsumerTwo =
client.newConsumer(Schema.STRING)
+
.subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe();
+
+ int count = 2;
+ // Step 5: produce messages to input topics.
+ for (int i = 0; i < count; i++) {
+ inputProducer.send("Hello Pulsar! count : " + i);
+ }
+
+ // Step 5: consume messages and produce them to output topics with
transactions.
+ for (int i = 0; i < count; i++) {
+
+ // Step 5: the consumer successfully receives messages.
+ Message<String> message = inputConsumer.receive();
+
+ // Step 6: create transactions.
+ // The transaction timeout is specified as 10 seconds.
+ // If the transaction is not committed within 10 seconds, the
transaction is automatically aborted.
+ Transaction txn = null;
+ try {
+ txn = client.newTransaction()
+ .withTransactionTimeout(10,
TimeUnit.SECONDS).build().get();
+ // Step 6: you can process the received message with your
use case and business logic.
+
+ // Step 7: the producers produce messages to output topics
with transactions
+ outputProducerOne.newMessage(txn).value("Hello Pulsar!
outputTopicOne count : " + i).send();
+ outputProducerTwo.newMessage(txn).value("Hello Pulsar!
outputTopicTwo count : " + i).send();
+
+ // Step 7: the consumers acknowledge the input message
with the transactions *individually*.
+ inputConsumer.acknowledgeAsync(message.getMessageId(),
txn).get();
+ // Step 8: commit transactions.
+ txn.commit().get();
+ } catch (ExecutionException e) {
+ if (!(e.getCause() instanceof
PulsarClientException.TransactionConflictException)) {
+ // If TransactionConflictException is not thrown,
+ // you need to redeliver or negativeAcknowledge this
message,
+ // or else this message will not be received again.
+ inputConsumer.negativeAcknowledge(message);
+ }
+
+ // If a new transaction is created,
+ // then the old transaction should be aborted.
+ if (txn != null) {
+ txn.abort();
+ }
+ }
+ }
+
+ // Final result: consume messages from output topics and print
them.
+ for (int i = 0; i < count; i++) {
+ Message<String> message = outputConsumerOne.receive();
+ System.out.println("Receive transaction message: " +
message.getValue());
+ }
+
+ for (int i = 0; i < count; i++) {
+ Message<String> message = outputConsumerTwo.receive();
+ System.out.println("Receive transaction message: " +
message.getValue());
+ }
+ }
+ }
+ ```
+
+ **Output**
+
+ ```java
+ Receive transaction message: Hello Pulsar! count : 1
+ Receive transaction message: Hello Pulsar! count : 2
+ Receive transaction message: Hello Pulsar! count : 1
+ Receive transaction message: Hello Pulsar! count : 2
+ ```
+
+## Related topics
+
+- To learn more features that can be used with transactions, see [Pulsar
transactions - Advanced features](txn-advanced-features.md).
\ No newline at end of file
diff --git a/docs/txn-what.md b/docs/txn-what.md
index 8b77be4fdd4..aaf832f9613 100644
--- a/docs/txn-what.md
+++ b/docs/txn-what.md
@@ -1,7 +1,7 @@
---
id: txn-what
title: What are transactions?
-sidebar_label: "What are transactions?"
+sidebar_label: "Concept"
---
Transactions strengthen the message delivery semantics of Apache Pulsar and
[processing guarantees of Pulsar
Functions](functions-concepts.md#processing-guarantees-and-subscription-types).
The Pulsar Transaction API supports atomic writes and acknowledgments across
multiple topics.
diff --git a/sidebars.json b/sidebars.json
index d5724f7a9f4..f10d5f7ae0c 100644
--- a/sidebars.json
+++ b/sidebars.json
@@ -204,9 +204,10 @@
"items": [
"txn-why",
"txn-what",
- "txn-how",
"txn-use",
- "txn-monitor"
+ "txn-advanced-features",
+ "txn-monitor",
+ "txn-how"
]
},
{