This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 072bf5d63d9 [improve][txn] Implementation of Delayed Transaction
Messages (#17548)
072bf5d63d9 is described below
commit 072bf5d63d981784a8f7804e195d27a19bd3dd07
Author: congbo <[email protected]>
AuthorDate: Thu Sep 15 10:14:29 2022 +0800
[improve][txn] Implementation of Delayed Transaction Messages (#17548)
link https://github.com/apache/pulsar/pull/17548
### Motivation
now delayed features and transaction messages cannot be used together.
When sending a transaction message with a delayed time and commit this
transaction, the message will be immediately received by consumers.
Code, eg.
```
@Test
public void testDelayedTransactionMessages() throws Exception {
String topic = NAMESPACE1 + "/testDelayedTransactionMessages";
@Cleanup
Consumer<String> sharedConsumer =
pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("shared-sub")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create();
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
// send delayed messages
for (int i = 0; i < 10; i++) {
producer.newMessage(transaction)
.value("msg-" + i)
.deliverAfter(5, TimeUnit.SECONDS)
.sendAsync();
}
producer.flush();
transaction.commit().get();
Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
// the msg now is not null
assertNull(msg);
}
```
This PR will implement clients to send delayed messages with transactions.
### Modifications
make transaction message can be put in `trackDelayedDelivery` to implement
client send delayed messages with the transaction.
It is worth noting that the dispatcher sends transaction messages to
consumers and should follow the `MaxReadPosition` change—(something about
`MaxReadPosition`
https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md).
Because of the existence of maxReadPosition, the distribution of
transaction messages depends on whether the previous transaction message is
completed. This will cause delay time extended, but not shortened
### Verifying this change
add the test
---
.../broker/service/AbstractBaseDispatcher.java | 4 +-
.../client/impl/TransactionEndToEndTest.java | 61 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 77c64bac788..d6f441f02bf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -169,7 +169,9 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
entry.release();
continue;
}
- } else if (msgMetadata == null ||
Markers.isServerOnlyMarker(msgMetadata)) {
+ }
+
+ if (msgMetadata == null ||
Markers.isServerOnlyMarker(msgMetadata)) {
PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a
server-only marker
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 37d9eb6967d..ddb9062454b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -31,6 +31,8 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
import java.util.List;
@@ -1340,4 +1342,63 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
assertEquals(value1, new String(deadLetterConsumer.receive(3,
TimeUnit.SECONDS).getValue()));
assertEquals(value2, new String(deadLetterConsumer.receive(3,
TimeUnit.SECONDS).getValue()));
}
+
+ @Test
+ public void testDelayedTransactionMessages() throws Exception {
+ String topic = NAMESPACE1 + "/testDelayedTransactionMessages";
+
+ @Cleanup
+ Consumer<String> failoverConsumer =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("failover-sub")
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe();
+
+ @Cleanup
+ Consumer<String> sharedConsumer =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("shared-sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage(transaction)
+ .value("msg-" + i)
+ .deliverAfter(5, TimeUnit.SECONDS)
+ .sendAsync();
+ }
+
+ producer.flush();
+
+ transaction.commit().get();
+
+ // Failover consumer will receive the messages immediately while
+ // the shared consumer will get them after the delay
+ Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
+ assertNull(msg);
+
+ for (int i = 0; i < 10; i++) {
+ msg = failoverConsumer.receive(100, TimeUnit.MILLISECONDS);
+ assertEquals(msg.getValue(), "msg-" + i);
+ }
+
+ Set<String> receivedMsgs = new TreeSet<>();
+ for (int i = 0; i < 10; i++) {
+ msg = sharedConsumer.receive(10, TimeUnit.SECONDS);
+ receivedMsgs.add(msg.getValue());
+ }
+
+ assertEquals(receivedMsgs.size(), 10);
+ for (int i = 0; i < 10; i++) {
+ assertTrue(receivedMsgs.contains("msg-" + i));
+ }
+ }
}