This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 072bf5d63d9 [improve][txn] Implementation of Delayed Transaction 
Messages (#17548)
072bf5d63d9 is described below

commit 072bf5d63d981784a8f7804e195d27a19bd3dd07
Author: congbo <[email protected]>
AuthorDate: Thu Sep 15 10:14:29 2022 +0800

    [improve][txn] Implementation of Delayed Transaction Messages (#17548)
    
    link https://github.com/apache/pulsar/pull/17548
    ### Motivation
    now delayed features and transaction messages cannot be used together.
    When sending a transaction message with a delayed time and commit this 
transaction, the message will be immediately received by consumers.
    
    Code, eg.
    ```
        @Test
        public void testDelayedTransactionMessages() throws Exception {
            String topic = NAMESPACE1 + "/testDelayedTransactionMessages";
    
            @Cleanup
            Consumer<String> sharedConsumer = 
pulsarClient.newConsumer(Schema.STRING)
                    .topic(topic)
                    .subscriptionName("shared-sub")
                    .subscriptionType(SubscriptionType.Shared)
                    .subscribe();
    
            @Cleanup
            Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                    .topic(topic)
                    .enableBatching(false)
                    .create();
    
            Transaction transaction = pulsarClient.newTransaction()
                    .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
    
            // send delayed messages
            for (int i = 0; i < 10; i++) {
                producer.newMessage(transaction)
                        .value("msg-" + i)
                        .deliverAfter(5, TimeUnit.SECONDS)
                        .sendAsync();
            }
    
            producer.flush();
    
            transaction.commit().get();
    
            Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
            // the msg now is not null
            assertNull(msg);
        }
    ```
    This PR will implement clients to send delayed messages with transactions.
    
    ### Modifications
    make transaction message can be put in `trackDelayedDelivery` to implement 
client send delayed messages with the transaction.
    
    It is worth noting that the dispatcher sends transaction messages to 
consumers and should follow the `MaxReadPosition` change—(something about 
`MaxReadPosition` 
https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md).
    
    Because of the existence of maxReadPosition, the distribution of 
transaction messages depends on whether the previous transaction message is 
completed. This will cause delay time extended, but not shortened
    
    ### Verifying this change
    add the test
---
 .../broker/service/AbstractBaseDispatcher.java     |  4 +-
 .../client/impl/TransactionEndToEndTest.java       | 61 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 77c64bac788..d6f441f02bf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -169,7 +169,9 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
                     entry.release();
                     continue;
                 }
-            } else if (msgMetadata == null || 
Markers.isServerOnlyMarker(msgMetadata)) {
+            }
+
+            if (msgMetadata == null || 
Markers.isServerOnlyMarker(msgMetadata)) {
                 PositionImpl pos = (PositionImpl) entry.getPosition();
                 // Message metadata was corrupted or the messages was a 
server-only marker
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 37d9eb6967d..ddb9062454b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -31,6 +31,8 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.util.Collection;
 import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.ArrayList;
 import java.util.List;
@@ -1340,4 +1342,63 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         assertEquals(value1, new String(deadLetterConsumer.receive(3, 
TimeUnit.SECONDS).getValue()));
         assertEquals(value2, new String(deadLetterConsumer.receive(3, 
TimeUnit.SECONDS).getValue()));
     }
+
+    @Test
+    public void testDelayedTransactionMessages() throws Exception {
+        String topic = NAMESPACE1 + "/testDelayedTransactionMessages";
+
+        @Cleanup
+        Consumer<String> failoverConsumer = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("failover-sub")
+                .subscriptionType(SubscriptionType.Failover)
+                .subscribe();
+
+        @Cleanup
+        Consumer<String> sharedConsumer = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("shared-sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage(transaction)
+                    .value("msg-" + i)
+                    .deliverAfter(5, TimeUnit.SECONDS)
+                    .sendAsync();
+        }
+
+        producer.flush();
+
+        transaction.commit().get();
+
+        // Failover consumer will receive the messages immediately while
+        // the shared consumer will get them after the delay
+        Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
+        assertNull(msg);
+
+        for (int i = 0; i < 10; i++) {
+            msg = failoverConsumer.receive(100, TimeUnit.MILLISECONDS);
+            assertEquals(msg.getValue(), "msg-" + i);
+        }
+
+        Set<String> receivedMsgs = new TreeSet<>();
+        for (int i = 0; i < 10; i++) {
+            msg = sharedConsumer.receive(10, TimeUnit.SECONDS);
+            receivedMsgs.add(msg.getValue());
+        }
+
+        assertEquals(receivedMsgs.size(), 10);
+        for (int i = 0; i < 10; i++) {
+            assertTrue(receivedMsgs.contains("msg-" + i));
+        }
+    }
 }

Reply via email to