congbobo184 opened a new pull request, #17548:
URL: https://github.com/apache/pulsar/pull/17548

   ### Motivation
   now delayed features and transaction messages cannot be used together.
   When sending a transaction message with a delayed time and commit this 
transaction, the message will be immediately received by consumers.
   
   Code, eg.
   ```
       @Test
       public void testDelayedTransactionMessages() throws Exception {
           String topic = NAMESPACE1 + "/testDelayedTransactionMessages";
   
           @Cleanup
           Consumer<String> sharedConsumer = 
pulsarClient.newConsumer(Schema.STRING)
                   .topic(topic)
                   .subscriptionName("shared-sub")
                   .subscriptionType(SubscriptionType.Shared)
                   .subscribe();
   
           @Cleanup
           Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                   .topic(topic)
                   .enableBatching(false)
                   .create();
   
           Transaction transaction = pulsarClient.newTransaction()
                   .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
   
           // send delayed messages
           for (int i = 0; i < 10; i++) {
               producer.newMessage(transaction)
                       .value("msg-" + i)
                       .deliverAfter(5, TimeUnit.SECONDS)
                       .sendAsync();
           }
   
           producer.flush();
   
           transaction.commit().get();
   
           Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
           // the msg now is not null
           assertNull(msg);
       }
   ```
   This PR will implement clients to send delayed messages with transactions.
   
   ### Modifications
   make transaction message can be put in `trackDelayedDelivery` to implement 
client send delayed messages with the transaction.
   
   It is worth noting that the dispatcher sends transaction messages to 
consumers and should follow the `MaxReadPosition` change—(something about 
`MaxReadPosition` 
https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md).
 
   
   Because of the existence of maxReadPosition, the distribution of transaction 
messages depends on whether the previous transaction message is completed. This 
will cause delay time extended, but not shortened
   
   ### Verifying this change
   add the test
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to