congbobo184 commented on code in PR #501: URL: https://github.com/apache/pulsar-site/pull/501#discussion_r1159194099
########## docs/txn-use.md: ########## @@ -1,148 +1,170 @@ --- id: txn-use -title: How to use transactions? -sidebar_label: "How to use transactions?" +title: Get started +sidebar_label: "Get started" --- -## Transaction API +Pulsar transaction is primarily a server-side and protocol-level feature. This tutorial guides you through every step of how to use the [Pulsar transaction API](/api/admin/) to send and receive messages in a Java client. -The transaction feature is primarily a server-side and protocol-level feature. You can use the transaction feature via the [transaction API](/api/admin/), which is available in **Pulsar 2.8.0 or later**. +:::note -To use the transaction API, you do not need any additional settings in the Pulsar client. **By default**, transactions are **disabled**. +Currently, [Pulsar transaction API](/api/admin/) is available in **Pulsar 2.8.0 or later** versions. It is only available for **Java** clients. -Currently, transaction API is only available for **Java** clients. Support for other language clients will be added in future releases. +::: +## Prerequisites -## Quick start +- [Start Pulsar 2.8.0 or later versions](#getting-started-standalone.md) -This section provides an example of how to use the transaction API to send and receive messages in a Java client. +## Steps -1. Start Pulsar 2.8.0 or later. +1. Enable transactions. You can set the following configurations in the [`broker.conf`](https://github.com/apache/pulsar/blob/master/conf/broker.conf) or [`standalone.conf`](https://github.com/apache/pulsar/blob/master/conf/standalone.conf) file. -2. Enable transaction. + ```conf + //mandatory configuration, used to enable transaction coordinator + transactionCoordinatorEnabled=true - Change the configuration in the `broker.conf` or `standalone.conf` file. + //mandatory configuration, used to create systemTopic used for transaction buffer snapshot + systemTopicEnabled=true + ``` - ```conf - //mandatory configuration, used to enable transaction coordinator - transactionCoordinatorEnabled=true + :::note - //mandtory configuration, used to create systemTopic used for transaction buffer snapshot - systemTopicEnabled=true - ``` - - * If you want to acknowledge batch messages in transactions, set `acknowledgmentAtBatchIndexLevelEnabled` to `true` in the `broker.conf` or `standalone.conf` file. - - ```conf - acknowledgmentAtBatchIndexLevelEnabled=true - ``` - - * If you want to guarantee exactly-once semantics, you need to enable [message deduplication](cookbooks-deduplication.md). - You can enable message deduplication at the broker level, the namespace level, or the topic level according to your needs. - - -3. Initialize transaction coordinator metadata. - - The transaction coordinator can leverage the advantages of partitioned topics (such as load balance). - - **Input** - - ```shell - bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone - ``` - - **Output** - - ```shell - Transaction coordinator metadata setup success - ``` - -4. Initialize a Pulsar client. - - ```shell - PulsarClient client = PulsarClient.builder() - .serviceUrl("pulsar://localhost:6650") - .enableTransaction(true) - .build(); - ``` - -Now you can start using the transaction API to send and receive messages. Below is an example of a `consume-process-produce` application written in Java. - - - -Let's walk through this example step by step. - -| Step | Description | -| --- | --- | -| 1. Start a transaction. | The application opens a new transaction by calling PulsarClient.newTransaction. It specifics the transaction timeout as 1 minute. If the transaction is not committed within 1 minute, the transaction is automatically aborted. | -| 2. Receive messages from topics. | The application creates two normal consumers to receive messages from topic input-topic-1 and input-topic-2 respectively. | -| 3. Publish messages to topics with the transaction. | The application creates two producers to produce the resulting messages to the output topic _output-topic-1_ and output-topic-2 respectively. The application applies the processing logic and generates two output messages. The application sends those two output messages as part of the transaction opened in the first step via `Producer.newMessage(Transaction)`. | -| 4. Acknowledge the messages with the transaction. | In the same transaction, the application acknowledges the two input messages. | -| 5. Commit the transaction. | The application commits the transaction by calling `Transaction.commit()` on the open transaction. The commit operation ensures the two input messages are marked as acknowledged and the two output messages are written successfully to the output topics. | - -[1] Example of enabling batch messages ack in transactions in the consumer builder. - -```java -Consumer<byte[]> consumer = pulsarClient - .newConsumer() - .topic(transferTopic) - .subscriptionName("transaction-sub") - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .subscriptionType(SubscriptionType.Shared) - .enableBatchIndexAcknowledgment(true) // enable batch index acknowledgment - .subscribe(); -``` - -[2] Example of using transactions to ack messages individually -```java -// resource prepare -String sourceTopicName = "persistent://" + NAMESPACE1 + "/sourceTopic"; -String sinkTopicName = "persistent://" + NAMESPACE1 + "/sinkTopic"; -String subName = "shared-subscription"; -String producerName = "txn-message-producer"; -try { - @Cleanup - Producer<String> sinkProducer = pulsarClient.newProducer(Schema.STRING) - .topic(sinkTopicName) - .producerName(producerName) - .create(); - @Cleanup - Consumer<String> sourceConsumer = pulsarClient.newConsumer(Schema.STRING) - .topic(sourceTopicName) - .subscriptionName(subName) - .subscriptionType(SubscriptionType.Shared) - .subscribe(); - Message<String> message = null; - Transaction transaction = null; - while (true) { - try { - message = sourceConsumer.receive(); - //Open a transaction to handle the received message - transaction = pulsarClient.newTransaction() - .withTransactionTimeout(5, TimeUnit.SECONDS) - .build() - .get(); - //Do some things there - //Send message to another topic - sinkProducer.newMessage(transaction) - .value("handle message " + message.getValue()) - .send(); - //Ack the message that has been consumed - sourceConsumer.acknowledgeAsync(message.getMessageId(), transaction).get(); - //Commit the transaction - transaction.commit().get(); - } catch (ExecutionException e) { - Throwable exception = e.getCause(); - if (!(exception instanceof PulsarClientException.TransactionConflictException)) { - //The message may not be handled, so we need to redeliver it - sourceConsumer.negativeAcknowledge(message); - } - if (!(exception instanceof TransactionCoordinatorClientException.TransactionNotFoundException) && transaction !=null) { - //Abort the transaction if there is an exception and the transaction is not end. - transaction.abort().get(); - } - } - } -} catch (Exception e) { - log.error("Catch Exception", e); -} -``` + **By default**, Pulsar transactions are **disabled**. + + ::: + +2. Initialize transaction coordinator metadata. + + The transaction coordinator can leverage the advantages of partitioned topics (such as load balance). + + **Input** + + ```shell + bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone + ``` + + **Output** + + ```shell + Transaction coordinator metadata setup success + ``` + +3. Create a Pulsar client and enable transactions. + +4. Create producers and consumers. + +5. Produce and receive messages. + +6. Create transactions. + +7. Produce and ack messages with transactions. + + :::note + + Currently, messages can be acked individually rather than cumulatively. + + ::: + +8. End transactions. + + :::tip + + The code snippet below is the example for step 3 - step 8. + + ::: + + **Input** + + ```java + PulsarClient client = PulsarClient.builder() + // Step 3: create a Pulsar client and enable transactions. + .enableTransaction(true) + .serviceUrl(jct.serviceUrl) + + // Step 4: create three producers to produce messages to input and output topics. + ProducerBuilder<String> producerBuilder = client.newProducer(Schema.STRING); + Producer<String> inputProducer = producerBuilder.topic(inputTopic) + .sendTimeout(0, TimeUnit.SECONDS).create(); + Producer<String> outputProducerOne = producerBuilder.topic(outputTopicOne) + .sendTimeout(0, TimeUnit.SECONDS).create(); + Producer<String> outputProducerTwo = producerBuilder.topic(outputTopicTwo) + .sendTimeout(0, TimeUnit.SECONDS).create(); + // Step 4: create three consumers to consume messages from input and output topics. + Consumer<String> inputConsumer = client.newConsumer(Schema.STRING) + .subscriptionName("your-subscription-name").topic(inputTopic).subscribe(); + Consumer<String> outputConsumerOne = client.newConsumer(Schema.STRING) + .subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe(); + Consumer<String> outputConsumerTwo = client.newConsumer(Schema.STRING) + .subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe(); + + int count = 2; + // Step 5: produce messages to input topics. + for (int i = 0; i < count; i++) { + inputProducer.send("Hello Pulsar! count : " + i); + } + + // Step 5: consume messages and produce them to output topics with transactions. + for (int i = 0; i < count; i++) { + + // Step 5: the consumer successfully receives messages. + Message<String> message = inputConsumer.receive(); + + // Step 6: create transactions. + // The transaction timeout is specified as 10 seconds. + // If the transaction is not committed within 10 seconds, the transaction is automatically aborted. + Transaction txn = null; + try { + txn = client.newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS).build().get(); + // Step 6: you can process the received message with your use case and business logic. + + // Step 7: the producers produce messages to output topics with transactions + outputProducerOne.newMessage(txn).value("Hello Pulsar! outputTopicOne count : " + i).send(); + outputProducerTwo.newMessage(txn).value("Hello Pulsar! outputTopicTwo count : " + i).send(); + + // Step 7: the consumers acknowledge the input message with the transactions *individually*. + inputConsumer.acknowledgeAsync(message.getMessageId(), txn).get(); + // Step 8: commit transactions. + txn.commit(); + } catch (ExecutionException e) { + if (!(e.getCause() instanceof PulsarClientException.TransactionConflictException)) { + // If TransactionConflictException is not thrown, + // you need to redeliver or negativeAcknowledge this message, + // or else this message will not be received. Review Comment: ```suggestion // or else this message will not be received again. ``` ########## docs/txn-use.md: ########## @@ -1,148 +1,170 @@ --- id: txn-use -title: How to use transactions? -sidebar_label: "How to use transactions?" +title: Get started +sidebar_label: "Get started" --- -## Transaction API +Pulsar transaction is primarily a server-side and protocol-level feature. This tutorial guides you through every step of how to use the [Pulsar transaction API](/api/admin/) to send and receive messages in a Java client. -The transaction feature is primarily a server-side and protocol-level feature. You can use the transaction feature via the [transaction API](/api/admin/), which is available in **Pulsar 2.8.0 or later**. +:::note -To use the transaction API, you do not need any additional settings in the Pulsar client. **By default**, transactions are **disabled**. +Currently, [Pulsar transaction API](/api/admin/) is available in **Pulsar 2.8.0 or later** versions. It is only available for **Java** clients. -Currently, transaction API is only available for **Java** clients. Support for other language clients will be added in future releases. +::: +## Prerequisites -## Quick start +- [Start Pulsar 2.8.0 or later versions](#getting-started-standalone.md) -This section provides an example of how to use the transaction API to send and receive messages in a Java client. +## Steps -1. Start Pulsar 2.8.0 or later. +1. Enable transactions. You can set the following configurations in the [`broker.conf`](https://github.com/apache/pulsar/blob/master/conf/broker.conf) or [`standalone.conf`](https://github.com/apache/pulsar/blob/master/conf/standalone.conf) file. -2. Enable transaction. + ```conf + //mandatory configuration, used to enable transaction coordinator + transactionCoordinatorEnabled=true - Change the configuration in the `broker.conf` or `standalone.conf` file. + //mandatory configuration, used to create systemTopic used for transaction buffer snapshot + systemTopicEnabled=true + ``` - ```conf - //mandatory configuration, used to enable transaction coordinator - transactionCoordinatorEnabled=true + :::note - //mandtory configuration, used to create systemTopic used for transaction buffer snapshot - systemTopicEnabled=true - ``` - - * If you want to acknowledge batch messages in transactions, set `acknowledgmentAtBatchIndexLevelEnabled` to `true` in the `broker.conf` or `standalone.conf` file. - - ```conf - acknowledgmentAtBatchIndexLevelEnabled=true - ``` - - * If you want to guarantee exactly-once semantics, you need to enable [message deduplication](cookbooks-deduplication.md). - You can enable message deduplication at the broker level, the namespace level, or the topic level according to your needs. - - -3. Initialize transaction coordinator metadata. - - The transaction coordinator can leverage the advantages of partitioned topics (such as load balance). - - **Input** - - ```shell - bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone - ``` - - **Output** - - ```shell - Transaction coordinator metadata setup success - ``` - -4. Initialize a Pulsar client. - - ```shell - PulsarClient client = PulsarClient.builder() - .serviceUrl("pulsar://localhost:6650") - .enableTransaction(true) - .build(); - ``` - -Now you can start using the transaction API to send and receive messages. Below is an example of a `consume-process-produce` application written in Java. - - - -Let's walk through this example step by step. - -| Step | Description | -| --- | --- | -| 1. Start a transaction. | The application opens a new transaction by calling PulsarClient.newTransaction. It specifics the transaction timeout as 1 minute. If the transaction is not committed within 1 minute, the transaction is automatically aborted. | -| 2. Receive messages from topics. | The application creates two normal consumers to receive messages from topic input-topic-1 and input-topic-2 respectively. | -| 3. Publish messages to topics with the transaction. | The application creates two producers to produce the resulting messages to the output topic _output-topic-1_ and output-topic-2 respectively. The application applies the processing logic and generates two output messages. The application sends those two output messages as part of the transaction opened in the first step via `Producer.newMessage(Transaction)`. | -| 4. Acknowledge the messages with the transaction. | In the same transaction, the application acknowledges the two input messages. | -| 5. Commit the transaction. | The application commits the transaction by calling `Transaction.commit()` on the open transaction. The commit operation ensures the two input messages are marked as acknowledged and the two output messages are written successfully to the output topics. | - -[1] Example of enabling batch messages ack in transactions in the consumer builder. - -```java -Consumer<byte[]> consumer = pulsarClient - .newConsumer() - .topic(transferTopic) - .subscriptionName("transaction-sub") - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .subscriptionType(SubscriptionType.Shared) - .enableBatchIndexAcknowledgment(true) // enable batch index acknowledgment - .subscribe(); -``` - -[2] Example of using transactions to ack messages individually -```java -// resource prepare -String sourceTopicName = "persistent://" + NAMESPACE1 + "/sourceTopic"; -String sinkTopicName = "persistent://" + NAMESPACE1 + "/sinkTopic"; -String subName = "shared-subscription"; -String producerName = "txn-message-producer"; -try { - @Cleanup - Producer<String> sinkProducer = pulsarClient.newProducer(Schema.STRING) - .topic(sinkTopicName) - .producerName(producerName) - .create(); - @Cleanup - Consumer<String> sourceConsumer = pulsarClient.newConsumer(Schema.STRING) - .topic(sourceTopicName) - .subscriptionName(subName) - .subscriptionType(SubscriptionType.Shared) - .subscribe(); - Message<String> message = null; - Transaction transaction = null; - while (true) { - try { - message = sourceConsumer.receive(); - //Open a transaction to handle the received message - transaction = pulsarClient.newTransaction() - .withTransactionTimeout(5, TimeUnit.SECONDS) - .build() - .get(); - //Do some things there - //Send message to another topic - sinkProducer.newMessage(transaction) - .value("handle message " + message.getValue()) - .send(); - //Ack the message that has been consumed - sourceConsumer.acknowledgeAsync(message.getMessageId(), transaction).get(); - //Commit the transaction - transaction.commit().get(); - } catch (ExecutionException e) { - Throwable exception = e.getCause(); - if (!(exception instanceof PulsarClientException.TransactionConflictException)) { - //The message may not be handled, so we need to redeliver it - sourceConsumer.negativeAcknowledge(message); - } - if (!(exception instanceof TransactionCoordinatorClientException.TransactionNotFoundException) && transaction !=null) { - //Abort the transaction if there is an exception and the transaction is not end. - transaction.abort().get(); - } - } - } -} catch (Exception e) { - log.error("Catch Exception", e); -} -``` + **By default**, Pulsar transactions are **disabled**. + + ::: + +2. Initialize transaction coordinator metadata. + + The transaction coordinator can leverage the advantages of partitioned topics (such as load balance). + + **Input** + + ```shell + bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone + ``` + + **Output** + + ```shell + Transaction coordinator metadata setup success + ``` + +3. Create a Pulsar client and enable transactions. + +4. Create producers and consumers. + +5. Produce and receive messages. + +6. Create transactions. + +7. Produce and ack messages with transactions. + + :::note + + Currently, messages can be acked individually rather than cumulatively. + + ::: + +8. End transactions. + + :::tip + + The code snippet below is the example for step 3 - step 8. + + ::: + + **Input** + + ```java + PulsarClient client = PulsarClient.builder() + // Step 3: create a Pulsar client and enable transactions. + .enableTransaction(true) + .serviceUrl(jct.serviceUrl) + + // Step 4: create three producers to produce messages to input and output topics. + ProducerBuilder<String> producerBuilder = client.newProducer(Schema.STRING); + Producer<String> inputProducer = producerBuilder.topic(inputTopic) + .sendTimeout(0, TimeUnit.SECONDS).create(); + Producer<String> outputProducerOne = producerBuilder.topic(outputTopicOne) + .sendTimeout(0, TimeUnit.SECONDS).create(); + Producer<String> outputProducerTwo = producerBuilder.topic(outputTopicTwo) + .sendTimeout(0, TimeUnit.SECONDS).create(); + // Step 4: create three consumers to consume messages from input and output topics. + Consumer<String> inputConsumer = client.newConsumer(Schema.STRING) + .subscriptionName("your-subscription-name").topic(inputTopic).subscribe(); + Consumer<String> outputConsumerOne = client.newConsumer(Schema.STRING) + .subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe(); + Consumer<String> outputConsumerTwo = client.newConsumer(Schema.STRING) + .subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe(); + + int count = 2; + // Step 5: produce messages to input topics. + for (int i = 0; i < count; i++) { + inputProducer.send("Hello Pulsar! count : " + i); + } + + // Step 5: consume messages and produce them to output topics with transactions. + for (int i = 0; i < count; i++) { + + // Step 5: the consumer successfully receives messages. + Message<String> message = inputConsumer.receive(); + + // Step 6: create transactions. + // The transaction timeout is specified as 10 seconds. + // If the transaction is not committed within 10 seconds, the transaction is automatically aborted. + Transaction txn = null; + try { + txn = client.newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS).build().get(); + // Step 6: you can process the received message with your use case and business logic. + + // Step 7: the producers produce messages to output topics with transactions + outputProducerOne.newMessage(txn).value("Hello Pulsar! outputTopicOne count : " + i).send(); + outputProducerTwo.newMessage(txn).value("Hello Pulsar! outputTopicTwo count : " + i).send(); + + // Step 7: the consumers acknowledge the input message with the transactions *individually*. + inputConsumer.acknowledgeAsync(message.getMessageId(), txn).get(); + // Step 8: commit transactions. + txn.commit(); Review Comment: txn.commit().get(); ########## docs/txn-advanced-features.md: ########## @@ -0,0 +1,45 @@ +--- +id: txn-advanced-features +title: Advanced features +sidebar_label: "Advanced features" +--- + +You can use the following advanced features with transactions in Pulsar. + +## Ack batch messages + +If you want to acknowledge batch messages with transactions, set `acknowledgmentAtBatchIndexLevelEnabled` to `true` in the [`broker.conf`](https://github.com/apache/pulsar/blob/master/conf/broker.conf) or [`standalone.conf`](https://github.com/apache/pulsar/blob/master/conf/standalone.conf) file. + + +```conf +acknowledgmentAtBatchIndexLevelEnabled=true +``` + +This example enables batch messages ack in transactions in the consumer builder. + +```java +Consumer<byte[]> consumer = pulsarClient + .newConsumer() + .topic(transferTopic) + .subscriptionName("transaction-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionType(SubscriptionType.Shared) + .enableBatchIndexAcknowledgment(true) // enable batch index acknowledgment + .subscribe(); +``` + +## Enable authentication Review Comment: I think it is better to directly configure the link of auth, like: https://pulsar.apache.org/docs/2.11.x/Authentication, but now don't have the link of this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
