liangyepianzhou opened a new issue, #15423:
URL: https://github.com/apache/pulsar/issues/15423

   ## Motivation
   
   In the current implementation, if the user sends messages using the 
previously committed transaction, these messages are automatically visible to 
the consumer. And this can confuse users.
   
   ## Goal
   * Add a map to store the lowWaterMark of TC in TB. 
   * Check lowWaterMark before appending transaction entry to TB.
   
   ## What is LowWaterMark
   The transactionId is identify by mostSigBits and leastSigBits. The 
mostSigBits is the ID of TC, and the leastSigBits is an incremental ID of TC. 
LowWaterMark is the smallest leastSigBits which means the smallest transaction 
that have not ended.
   
![image](https://user-images.githubusercontent.com/55571188/166635162-d48874aa-8a1e-4443-a045-673779954bdd.png)
   
   ## How LowWaterMark work for TB now
   Now, we get lowWaterMark from TC when end transaction on topic. And then to 
judge if have ongoing transaction < lowWaterMark. If ongoing transaction < 
lowWaterMark, we will append an abort mark and change maxReadPosition.
   But we use  a collection(aborts) to filter abort transaction message instead 
of mark, so consumer can receive the transaction message sent after committing.
   
![image](https://user-images.githubusercontent.com/55571188/166636135-0d966f50-f191-4263-becb-b55c86438644.png)
   
   ## What is expected
   Store the lowWaterMark when TC end transaction on TransactionBuffer. 
   And check LowWaterMark before append transaction entry to TB.
   
![image](https://user-images.githubusercontent.com/55571188/166638125-c9829144-7bb0-48f1-80a8-3f7c454ac128.png)
   ## Implement
   1. store the lowWaterMark at end txn
   ```java
   lowWaterMarks.compute(txnID.getMostSigBits(), (k, v) -> lowWaterMark);
   ```
   1. check lowWaterMark before append transaction message to TB.
   ```java
     if (lowWaterMarks.get(txnId.getMostSigBits()) != null
                   && lowWaterMarks.get(txnId.getMostSigBits()) >= 
txnId.getLeastSigBits()) {
               completableFuture.completeExceptionally(new 
TransactionBufferException
                       .TransactionNotFoundException("Transaction [" + txnId + 
"] has been ended. "
                       + "Please use a new transaction to send message."));
           }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to