Hi, thanks for starting this discussion. But the format is broken, it's hard to read, could you please resend a well-formatted version? Or, paste a doc link?
Regards On Thu, Sep 29, 2022 at 7:23 PM 柳尘 <yuluoxinsh...@gmail.com> wrote: > *Status* > > > * - Current State: Discussing- Authors: [complone](github.com/complone > <http://github.com/complone>)- Shepherds: - Mailing List discussion: > dev@rocketmq.apache.org <dev@rocketmq.apache.org>- Pull Request: - > Released: no*Background & MotivationWhat do we need to do > > > * - Will we add a new module? yes- Will we add new APIs? yes- Will we add > new feature? yes*Why should we do that > > > > > > > > * - Are there any problems of our current project?At present, in the > process of rocketmq producing messages, there is uncertainty in the network > call itself, that is, the so-called processing state, so there will be > repetitions. Many other MQ products also have this problem. The usual > approach is to ask consumers to deduplicate messages when consuming > messages. Idempotent bases should be generated by message producers. When > sending a message, we can pass the message's key. Set the id, the > corresponding API is ```org.apache.rocketmq.common.message.setKeys(String > keys)```. - What can we benefit proposed changes?We introduce a notion of > TransactionalId, to enable users to uniquely identify producers in a > persistent way. Different instances of a producer with the same > TransactionalId will be able to resume (or abort) any transactions > instantiated by the previous instance.We introduce the notion of a producer > epoch, which enables us to ensure that there is only one legitimate active > instance of a producer with a given TransactionalId, and hence enables us > to maintain transaction guarantees in the event of failures.*Goals > > > > > > > > > * - What problem is this proposal designed to solve?When tuned for > reliability, users can guarantee that every message write will be persisted > at least once without data loss. Duplications may appear in the stream due > to producer retries. For example, a broker might crash between committing a > message and sending an ack to the producer, causing the producer to retry, > resulting in duplicate messages in the stream.However, idempotent producers > don’t provide guarantees for writes across multiple MessageQueues. For > this, one needs stronger transactional guarantees, ie. the ability to write > to several MessageQueues atomically. By atomically, we mean the ability to > commit a set of messages across MessageQueues as a unit: either all > messages are committed, or none of them are. - To what degree should we > solve the problem?Within a transaction, we also need to make sure that > there is no duplicate messages generated by the producer. To achieve this, > we are going to add sequence numbers to messages to allow the brokers to > de-duplicate messages per producer and topic partition. For each topic > partition that is written to, the producer maintains a sequence number > counter and assigns the next number in the sequence for each new message. > The broker verifies that the next message produced has been assigned the > next number and otherwise returns an error. In addition, since the sequence > number is per producer and topic partition, we also need to uniquely > identify a producer across multiple sessions (i.e. when the producer fails > and recreates, etc). Hence we introduce a new TransactionalId to > distinguish producers, along with an epoch number so that zombie writers > with the same TransactionalId can be fenced.At any given point in time, a > producer can only have one ongoing transaction, so we can distinguish > messages that belong to different transactions by their respective > TransactionalId. Producers with the same TransactionalId will talk to the > same transaction coordinator which also keeps track of their > TransactionalIds in addition to managing their transaction status. * > Non-Goals > > > * - What problem is this proposal NOT designed to solve?- Are there any > limits of this proposal?*ChangesArchitecture > > Interface Design/Change > > > > > > > > > > > > > > > > > > > > > > > > > > * - Method signature changesA set of new public APIs to the > TransactionMQProducer class, and describe how these APIs will be > implemented./* initialize the producer as a transactional producer > */initTransactions()The following steps will be taken when > initTransactions() is called: 1. If no TransactionalId has been provided in > configuration, skip to step 3.2. Send a FindCoordinatorRequest with the > configured TransactionalId and with CoordinatorType encoded as > “transaction” to a random broker. Block for the corresponding response, > which will return the assigned transaction coordinator for this producer.3. > Send an InitPidRequest to the transaction coordinator or to a random broker > if no TransactionalId was provided in configuration. Block for the > corresponding response to get the returned PID./* start a transaction to > produce messages */beginTransaction() The following steps are executed on > the producer when beginTransaction is called: 1. Check if the producer is > transactional (i.e. init has been called), if not throw an exception (we > omit this step in the rest of the APIs, but they all need to execute it).2. > Check whether a transaction has already been started. If so, raise an > exception./* send offsets for a given consumer group within this > transaction */sendOffsetsToTransaction( Map<TopicPartition, > OffsetAndMetadata> offsets, String consumerGroupId) The > following steps are executed on the producer when sendOffsetsToTransaction > is called: 1. Check if it is currently within a transaction, if not throw > an exception; otherwise proceed to the next step.2. Check if this function > has ever been called for the given groupId within this transaction. If not > then send an AddOffsetsToTxnRequest to the transaction coordinator, block > until the corresponding response is received; otherwise proceed to the next > step.3. Send a TxnOffsetCommitRequest to the coordinator return from the > response in the previous step, block until the corresponding response is > received./* commit the transaction with its produced messages > */commitTransaction() The following steps are executed on the producer when > commitTransaction is called: 1. Check if there is an active transaction, if > not throw an exception; otherwise proceed to the next step.2. Call flush to > make sure all sent messages in this transactions are acknowledged.3. Send > an EndTxnRequest with COMMIT command to the transaction coordinator, block > until the corresponding response is received. - Method behavior changes - > CLI command changes- Log format or content changes* Compatibility, > Deprecation, and Migration Plan > > > * - Are backward and forward compatibility taken into consideration?- Are > there deprecated APIs?- How do we do migration?*Implementation Outline > Phase 1 > > > *First, before sending data, you need to manually create a transaction > coordinator to control the one to many relationship between PID and > transcationId. To this end, you need to ensure that the consumption > progress of offset is notified to the tc in a timely manner and that the > messages of submitted but unsettled transactions will not be deleted by > mistake during the simultaneous process of the scheduled deletion > task*Phase > 2 > > *Whether it is necessary to judge the progress of the current offset and > the global PID visible transaction offset in PullLiteProducer in order to > enable PID ->transactionid to achieve cross partition transaction > consistency*Phase 3 > > > *When downstream consumers consume, they need to ensure that messages > marked as transactionId in the upstream market are perceived by downstream > consumers*Rejected Alternatives How does alternatives solve the issue you > proposed?Pros and Cons of alternativesWhy should we reject above > alternatives >