liangyepianzhou opened a new issue, #16042:
URL: https://github.com/apache/pulsar/issues/16042

   ## Motivation
   Transaction buffer stores aborted transaction IDs to filter messages which 
are aborted. In order to recover, the Transaction buffer will take snapshots to 
store the aborted transaction IDs in the bookkeeper, but the size of aborted 
transaction IDs is not limited. When the size of aborted transaction IDs is 
bigger than the size that a bookkeeper entry can store, the Transaction buffer 
needs to store multiple-snapshot into multiple entries to store aborted 
transaction IDs.
   <img width="1202" alt="image" 
src="https://user-images.githubusercontent.com/55571188/173371035-3746df19-b7a6-4d49-90f8-6d64a1de7526.png";>
   ## Challenges
   Due to compression and incomplete sending, there are some challenges to 
achieve multiple-snapshot.
   1.  Due to broker restart, transaction buffer may only write a part of 
multiple snapshots.
       * eg. Transaction buffer needs to write multiple-snapshot(1,2,3). but 
the transaction buffer only write snapshot 1, 2,and then  broker restart.
   <img width="1206" alt="image" 
src="https://user-images.githubusercontent.com/55571188/173371118-6d8bea11-efa5-49ef-96ba-c5d36e222126.png";>
   2. Due to compression, the new snapshot will cover the old snapshot with the 
same key.
       * eg. This will make a multiple-snapshot(1, 2 , 3) may have snapshot 1, 
2 writed the second time, and snapshot writed the first time.
   <img width="1446" alt="image" 
src="https://user-images.githubusercontent.com/55571188/173371183-06741283-833f-40d1-b1df-2934575dd131.png";>##
 Approach
   ### Implement
   1. change aborts from LinkedMap to ConcurrentSkipListMap.
   2. send multiple snapshots with key (topicName-1, topicName-2 .... 
topicName-end) and send normal snapshots with key (topicName-end).
   3. Only store maxReadPosition into the snapshots with key (topicName-end)
   ### Goal
   1. Make aborted transaction IDs be sorted by the position of the aborted 
marker. And then aborts will FIFO map.
   2. And then the new snapshot covering the old snapshot will not make an 
error.
   3. There always is a snapshot with the right maxReadPosition to recover.
   ### Examples
   #### Normal Flow
   The first snapshot is taken when new a producer to send message, So there 
must be a snasphot with key (topicName-end) which has maxReadPosition to 
recover.
   <img width="1570" alt="image" 
src="https://user-images.githubusercontent.com/55571188/173371317-43b5eb10-287a-4f72-8e85-6e440d9bf14d.png";>
   #### Write incompletely
    When transaction IDs are sorted by the position of the aborted marker and 
transaction IDs have not been deleted from aborts, the txn IDs stored in 
snapshots are the same for the snapshot same key (Exclude key topicName-end). 
   <img width="916" alt="image" 
src="https://user-images.githubusercontent.com/55571188/173371415-956a77a5-e2c4-43d6-90cb-5864cdcf4ad7.png";>
   #### Write incompletely and have transaction IDs been removed due to the 
ledger deleted
   Because it is deleted in the order of the position of the aborted marker, no 
message will be lost when compressing with the new snapshot. There always is a 
valid maxReadPsoition that can be used to recover.
   <img width="895" alt="image" 
src="https://user-images.githubusercontent.com/55571188/173371460-609e8ed5-5fa6-4656-accb-19be09fadf1a.png";>
   
   ## Reject Alternatives
   
   ### Add a snapshotEntryCounts field in TransactionBufferSnapshot
   Add a snapshotEntryCounts field for each transactionBufferSnapshot. For the 
normal transactionBufferSnapshot, snapshotEntryCount will be set to 1; for the 
multiple-snapshot, snapshotEntryCount will be set to the number of entries to 
store the snapshot.
   ```
   public class TransactionBufferSnapshot {
       private String topicName;
       private long maxReadPositionLedgerId;
       private long maxReadPositionEntryId;
       private long snapshotEntryCount;
       private List<AbortTxnMetadata> aborts;
   }
   ```
   ### marked multiple-snapshot with null field
   For the multiple-snapshot, we only write the data of aborts and maxRead 
Position in the front entries without setting topicName . Only set topicName in 
the last entry. When the reader reads TopicName = null, it means the beginning 
of a multiple-snapshot, and read topicName! =null is the end of this 
multiple-snapshot.
   
![image](https://user-images.githubusercontent.com/55571188/173372822-1c830396-1512-4264-843f-662dc88dd010.png)
   
   ### API changes
   ```
   interface Writer<T> {
       /**
        * Write event to the system topic.
        * @param t pulsar event
        * @param topic the topicName for the pulsar event
        * @return message id
        * @throws PulsarClientException exception while write event cause
        */
       MessageId write(T t, String Topic) throws PulsarClientException;
   
       /**
        * Async write event to the system topic.
        * @param t pulsar event
        * @param topic the topicName for the pulsar event
        * @return message id future
        */
       CompletableFuture<MessageId> writeAsync(T t, String topic);
   ```
   ### Implement
   ```
   TransactionBufferSnapshotWriter
   public CompletableFuture<MessageId> writeAsync(TransactionBufferSnapshot 
transactionBufferSnapshot, String topicName) {
       return producer.newMessage().key(topicName)
               .value(transactionBufferSnapshot).sendAsync();
   }
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to