This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 77ff53bab90 [fix] [txn] fix consumer can receive aborted txn message
when readType is replay (#19815)
77ff53bab90 is described below
commit 77ff53bab90405bc501878946833dfa1498edc97
Author: ken <[email protected]>
AuthorDate: Wed Jul 5 17:00:57 2023 +0800
[fix] [txn] fix consumer can receive aborted txn message when readType is
replay (#19815)
Co-authored-by: fanjianye <[email protected]>
---
.../broker/service/AbstractBaseDispatcher.java | 2 +-
.../broker/transaction/TransactionConsumeTest.java | 60 ++++++++++++++++++++++
2 files changed, 61 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index d6184c657c4..c9bf2d6770f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -164,7 +164,7 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
entry.release();
continue;
}
- if (!isReplayRead && msgMetadata != null &&
msgMetadata.hasTxnidMostBits()
+ if (msgMetadata != null && msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
if (Markers.isTxnMarker(msgMetadata)) {
// because consumer can receive message is smaller than
maxReadPosition,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
index 32efdb51659..847eb31da4f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -31,6 +32,9 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
+import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -223,6 +227,62 @@ public class TransactionConsumeTest extends
TransactionTestBase {
log.info("TransactionConsumeTest sortedTest finish.");
}
+ @Test
+ public void testMessageRedelivery() throws Exception {
+ int transactionMessageCnt = 10;
+ String subName = "shared-test";
+
+ @Cleanup
+ Consumer<byte[]> sharedConsumer = pulsarClient.newConsumer()
+ .topic(CONSUME_TOPIC)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ Awaitility.await().until(sharedConsumer::isConnected);
+
+ long mostSigBits = 2L;
+ long leastSigBits = 5L;
+ TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+
+ // produce batch message with txn and then abort
+ PersistentTopic persistentTopic = (PersistentTopic)
getPulsarServiceList().get(0).getBrokerService()
+ .getTopic(CONSUME_TOPIC, false).get().get();
+
+ List<String> sendMessageList = new ArrayList<>();
+ List<MessageIdData> messageIdDataList =
appendTransactionMessages(txnID, persistentTopic, transactionMessageCnt,
sendMessageList);
+
+ persistentTopic.endTxn(txnID, TxnAction.ABORT_VALUE, 0L).get();
+ log.info("Abort txn.");
+
+ // redeliver transaction messages to shared consumer
+ PersistentSubscription subRef =
persistentTopic.getSubscription(subName);
+ PersistentDispatcherMultipleConsumers dispatcher =
(PersistentDispatcherMultipleConsumers) subRef
+ .getDispatcher();
+ Field redeliveryMessagesField =
PersistentDispatcherMultipleConsumers.class
+ .getDeclaredField("redeliveryMessages");
+ redeliveryMessagesField.setAccessible(true);
+ MessageRedeliveryController redeliveryMessages = new
MessageRedeliveryController(true);
+
+ final Field totalAvailablePermitsField =
PersistentDispatcherMultipleConsumers.class
+ .getDeclaredField("totalAvailablePermits");
+ totalAvailablePermitsField.setAccessible(true);
+ totalAvailablePermitsField.set(dispatcher, 1000);
+
+ for (MessageIdData messageIdData : messageIdDataList) {
+ redeliveryMessages.add(messageIdData.getLedgerId(),
messageIdData.getEntryId());
+ }
+
+ redeliveryMessagesField.set(dispatcher, redeliveryMessages);
+ dispatcher.readMoreEntries();
+
+ // shared consumer should not receive the redelivered aborted
transaction messages
+ Message message = sharedConsumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNull(message);
+
+ log.info("TransactionConsumeTest testMessageRedelivery finish.");
+ }
+
private void sendNormalMessages(Producer<byte[]> producer, int startMsgCnt,
int messageCnt, List<String>
sendMessageList)
throws PulsarClientException {