thetumbled opened a new pull request, #20356: URL: https://github.com/apache/pulsar/pull/20356
<!-- If the PR belongs to a PIP, please add the PIP link here --> PIP: #255 https://github.com/apache/pulsar/issues/19744 ### Motivation - Message queues such as Kafka and Pulsar can only guarantee the exactly-once semantics provided by the transaction feature under **the specific use scenario of `consume-transform-produce` pattern**, that is, a transaction contains both production and consumption. The operations in the transaction include the production on the sink side and the offset submission on the source side. Using the atomicity of the transaction, these two operations are either completed at the same time or not completed at the same time. It does not need to worry about whether the transaction is committed successfully, because regardless of whether it is successful or not, the end-to-end state is consistent before and after. Therefore, transaction feature implemented by Kafka and Pulsar only support commit or abort once, and it is illegal to repeatedly submit commit or abort requests afterwards, that is, **they do not support the idempotence of commit operations**. - But in many other use cases, which is different from `consume-transform-produce` pattern, we need to know the accurate state of the transaction after the commit operation is submitted. For example, - In the case of `produce-only`, the transaction only contains the production operation, and the offset submission operation is not included, **which is simillar to RocketMQ.** - The exactly-once semantics guaranteed by Flink is based on the `Two-Phase Commit` protocol implemented by Flink itself. When connecting to an external system, Flink has requirements for external system to ensure the exact once semantics: 1. Provide transaction functionality 2. The transaction commit operation should **ensure idempotence**. The details can be found in the following link: https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka - Though Kafka do not support for idempotence of commit operations, but Flink-Kafka-Connector do some tricks to achieve the **idempotence of commit operations for the last transaction**, so that Flink+Kafka can guarantee the exactly-once semantics in most of the cases, but still with some risks. - But for Pulsar, it is impossible to achieve any idempotence of commit operations currently, because the implementation of transaction in pulsar is quite different from kafka. I have post a blog to analyze the difference between Pulsar and Kafka. https://blog.csdn.net/m0_43406494/article/details/130344399 <!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. --> ### Modifications - Provide the idempotence of commit operations for the transaction in Pulsar, which is disabled by default. - We will introduce a `TransactionMetadataPreserver` to store the terminated transaction metadata which is a component of TC. - Once we catch the `TrsansactionNotFound` exception, we will query the `TransactionMetadataPreserver` to know the state of the transaction. - Client will attach the `clientName` to the transaction, and `TransactionMetadataPreserver` will preserve `TransactionMetaPersistCount` number of transaction metadata for each client. ### API Changes - wire protocol change ``` message CommandNewTxn { required uint64 request_id = 1; optional uint64 txn_ttl_seconds = 2 [default = 0]; optional uint64 tc_id = 3 [default = 0]; **optional string client_name = 4;** } message CommandEndTxn { required uint64 request_id = 1; optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; optional TxnAction txn_action = 4; **optional string client_name = 5;** } message TransactionMetadataEntry { ... **optional string clientName = 13;** } enum ServerError { ... **TransactionPreserverClosed = 26; // Transaction metadata preserver is closed** } ``` - client configuration change ``` @ApiModelProperty( name = "clientName", value = "Client name that is used to save transaction metadata." ) private String clientName; ``` - broker configuration change ``` @FieldContext( category = CATEGORY_TRANSACTION, doc = "Max number of txnMeta of aborted transaction to persist in each TC." + "If the number of terminated transaction is greater than this value, the oldest terminated transaction will be " + "removed from the cache and persisted in the store." + "default value is 0, disable persistence of terminated transaction." ) private int TransactionMetaPersistCount = 0; @FieldContext( category = CATEGORY_TRANSACTION, doc = "Time in hour to persist the transaction metadata in TransactionMetadataPreserver." ) private long TransactionMetaPersistTimeInHour = 72; @FieldContext( category = CATEGORY_TRANSACTION, doc = "Interval in seconds to check the expired transaction metadata in TransactionMetadataPreserver." ) private long TransactionMetaExpireCheckIntervalInSecond = 300; ``` ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [ ] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> ### Matching PR in forked repository PR in forked repository: <!-- ENTER URL HERE --> <!-- After opening this PR, the build in apache/pulsar will fail and instructions will be provided for opening a PR in the PR author's forked repository. apache/pulsar pull requests should be first tested in your own fork since the apache/pulsar CI based on GitHub Actions has constrained resources and quota. GitHub Actions provides separate quota for pull requests that are executed in a forked repository. The tests will be run in the forked repository until all PR review comments have been handled, the tests pass and the PR is approved by a reviewer. --> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
