This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 77fca3e  [Transaction] Fix transaction message ack (#8007)
77fca3e is described below

commit 77fca3e59044a723cc4a06b525db6a869330bae1
Author: ran <[email protected]>
AuthorDate: Wed Sep 16 17:47:03 2020 +0800

    [Transaction] Fix transaction message ack (#8007)
    
    ### Motivation
    
    The transaction message ack is not well.
    
    ### Modifications
    
    Fix the transaction message ack.
---
 .../PersistentDispatcherMultipleConsumers.java     |   7 +
 .../PersistentDispatcherSingleActiveConsumer.java  |   7 +
 .../service/persistent/TransactionReader.java      |  32 ++--
 .../impl/PersistentTransactionBufferReader.java    |   2 +-
 .../broker/transaction/TransactionConsumeTest.java |  18 +-
 .../pulsar/client/transaction/EndToEndTest.java    | 182 +++++++++++++++++----
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   3 +-
 7 files changed, 197 insertions(+), 54 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a053681..37c5d9d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -59,6 +59,7 @@ import 
org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
+import 
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -571,6 +572,12 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 // Notify the consumer only if all the messages were already 
acknowledged
                 consumerList.forEach(Consumer::reachedEndOfTopic);
             }
+        } else if (exception.getCause() instanceof 
TransactionNotSealedException) {
+            waitTimeMillis = 1;
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Error reading transaction entries : {}, Read 
Type {} - Retrying to read in {} seconds",
+                        name, exception.getMessage(), readType, waitTimeMillis 
/ 1000.0);
+            }
         } else if (!(exception instanceof TooManyRequestsException)) {
             log.error("[{}] Error reading entries at {} : {}, Read Type {} - 
Retrying to read in {} seconds", name,
                     cursor.getReadPosition(), exception.getMessage(), 
readType, waitTimeMillis / 1000.0);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 0e6d747..e71591e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -49,6 +49,7 @@ import 
org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
+import 
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
@@ -479,6 +480,12 @@ public final class 
PersistentDispatcherSingleActiveConsumer extends AbstractDisp
                 // Notify the consumer only if all the messages were already 
acknowledged
                 consumers.forEach(Consumer::reachedEndOfTopic);
             }
+        } else if (exception.getCause() instanceof 
TransactionNotSealedException) {
+            waitTimeMillis = 1;
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Error reading transaction entries : {}, - 
Retrying to read in {} seconds", name,
+                        exception.getMessage(), waitTimeMillis / 1000.0);
+            }
         } else if (!(exception instanceof TooManyRequestsException)) {
             log.error("[{}-{}] Error reading entries at {} : {} - Retrying to 
read in {} seconds", name, c,
                     cursor.getReadPosition(), exception.getMessage(), 
waitTimeMillis / 1000.0);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/TransactionReader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/TransactionReader.java
index b954506..d5455a0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/TransactionReader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/TransactionReader.java
@@ -20,23 +20,21 @@ package org.apache.pulsar.broker.service.persistent;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import com.google.common.collect.Queues;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
-import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
-import org.apache.pulsar.broker.transaction.buffer.TransactionEntry;
+import 
org.apache.pulsar.broker.transaction.buffer.exceptions.EndOfTransactionException;
+import 
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 
 /**
@@ -112,6 +110,15 @@ public class TransactionReader {
         }
         transactionBufferReader.thenAccept(reader -> {
             reader.readNext(readMessageNum).whenComplete((transactionEntries, 
throwable) -> {
+                if (throwable != null && throwable.getCause() instanceof 
EndOfTransactionException) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("transaction {} read finished.", txnID);
+                    }
+                    resetReader(txnID, reader);
+                    
readEntriesCallback.readEntriesComplete(Collections.EMPTY_LIST, ctx);
+                    return;
+                }
+
                 if (throwable != null) {
                     log.error("Read transaction messages failed.", throwable);
                     readEntriesCallback.readEntriesFailed(
@@ -131,15 +138,20 @@ public class TransactionReader {
                                 
transactionEntries.get(0).committedAtEntryId()),
                         transactionEntries.get(0).numMessageInTxn());
 
-                if (transactionEntries.size() < readMessageNum) {
-                    resetReader(txnID, reader);
-                }
                 readEntriesCallback.readEntriesComplete(new 
ArrayList<>(transactionEntries), ctx);
             });
         }).exceptionally(throwable -> {
-            log.error("Open transactionBufferReader failed.", throwable);
-            readEntriesCallback.readEntriesFailed(
-                    
ManagedLedgerException.getManagedLedgerException(throwable), ctx);
+            transactionBufferReader = null;
+            if (throwable.getCause() instanceof TransactionNotSealedException) 
{
+                if (log.isDebugEnabled()) {
+                    log.debug("transaction {} is not sealed, failed to open 
transactionBufferReader.", txnID);
+                }
+                readEntriesCallback.readEntriesFailed(
+                        
ManagedLedgerException.getManagedLedgerException(throwable.getCause()), ctx);
+                return null;
+            }
+            log.error("open transactionBufferReader failed.", throwable);
+            
readEntriesCallback.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(throwable),
 ctx);
             return null;
         });
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBufferReader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBufferReader.java
index 0f1ab70..6956cfb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBufferReader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBufferReader.java
@@ -56,7 +56,7 @@ public class PersistentTransactionBufferReader implements 
TransactionBufferReade
 
     PersistentTransactionBufferReader(TransactionMeta meta, ManagedLedger 
ledger)
         throws TransactionNotSealedException {
-        if (TxnStatus.OPEN == meta.status()) {
+        if (TxnStatus.COMMITTED != meta.status()) {
             throw new TransactionNotSealedException("Transaction `" + 
meta.id() + "` is not sealed yet");
         }
         this.meta = meta;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
index ac70225..b2f3221 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
@@ -114,10 +114,11 @@ public class TransactionConsumeTest extends 
TransactionTestBase {
         appendTransactionMessages(txnID, transactionBuffer, 
transactionMessageCnt);
         sendNormalMessages(producer, messageCntBeforeTxn, messageCntAfterTxn);
 
+        Message<byte[]> message;
         for (int i = 0; i < totalMsgCnt; i++) {
             if (i < (messageCntBeforeTxn + messageCntAfterTxn)) {
                 // receive normal messages successfully
-                Message<byte[]> message = exclusiveConsumer.receive(2, 
TimeUnit.SECONDS);
+                message = exclusiveConsumer.receive(2, TimeUnit.SECONDS);
                 Assert.assertNotNull(message);
                 log.info("Receive exclusive normal msg: {}" + new 
String(message.getData(), UTF_8));
                 message = sharedConsumer.receive(2, TimeUnit.SECONDS);
@@ -125,33 +126,34 @@ public class TransactionConsumeTest extends 
TransactionTestBase {
                 log.info("Receive shared normal msg: {}" + new 
String(message.getData(), UTF_8));
             } else {
                 // can't receive transaction messages before commit
-                Message<byte[]> message = exclusiveConsumer.receive(2, 
TimeUnit.SECONDS);
+                message = exclusiveConsumer.receive(2, TimeUnit.SECONDS);
                 Assert.assertNull(message);
+                log.info("exclusive consumer can't receive message before 
commit.");
+
                 message = sharedConsumer.receive(2, TimeUnit.SECONDS);
                 Assert.assertNull(message);
-                log.info("Can't receive message before commit.");
+                log.info("shared consumer can't receive message before 
commit.");
             }
         }
 
-        transactionBuffer.endTxnOnPartition(txnID, 
PulsarApi.TxnAction.COMMIT.getNumber());
-        Thread.sleep(1000);
+        transactionBuffer.endTxnOnPartition(txnID, 
PulsarApi.TxnAction.COMMIT.getNumber()).get();
         log.info("Commit txn.");
 
         Map<String, Integer> exclusiveBatchIndexMap = new HashMap<>();
         Map<String, Integer> sharedBatchIndexMap = new HashMap<>();
         // receive transaction messages successfully after commit
         for (int i = 0; i < transactionMessageCnt; i++) {
-            Message<byte[]> message = exclusiveConsumer.receive(2, 
TimeUnit.SECONDS);
+            message = exclusiveConsumer.receive(5, TimeUnit.SECONDS);
             Assert.assertNotNull(message);
             Assert.assertTrue(message.getMessageId() instanceof 
BatchMessageIdImpl);
             checkBatchIndex(exclusiveBatchIndexMap, (BatchMessageIdImpl) 
message.getMessageId());
             log.info("Receive txn exclusive id: {}, msg: {}", 
message.getMessageId(), new String(message.getData()));
 
-            message = sharedConsumer.receive(2, TimeUnit.SECONDS);
+            message = sharedConsumer.receive(5, TimeUnit.SECONDS);
             Assert.assertNotNull(message);
             Assert.assertTrue(message.getMessageId() instanceof 
BatchMessageIdImpl);
             checkBatchIndex(sharedBatchIndexMap, (BatchMessageIdImpl) 
message.getMessageId());
-            log.info("Receive txn shared id: {}, msg: {}", 
message.getMessageId(), new String(message.getData(), UTF_8));
+            log.info("Receive txn shared id: {}, msg: {}", 
message.getMessageId(), new String(message.getData()));
         }
         log.info("TransactionConsumeTest noSortedTest finish.");
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
index 75f3bae..47f0ad5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
@@ -21,32 +21,38 @@ package org.apache.pulsar.client.transaction;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import com.google.common.collect.Sets;
-import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PartitionedProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+
 /**
  * End to end transaction test.
  */
@@ -91,66 +97,81 @@ public class EndToEndTest extends TransactionTestBase {
     }
 
     @Test
-    public void partitionCommitTest() throws Exception {
-        Transaction txn = getTxn();
-
-        @Cleanup
-        PartitionedProducerImpl<byte[]> producer = 
(PartitionedProducerImpl<byte[]>) pulsarClient
-                .newProducer()
-                .topic(TOPIC_OUTPUT)
-                .sendTimeout(0, TimeUnit.SECONDS)
-                .enableBatching(false)
-                .create();
-
-        int messageCnt = 10;
-        for (int i = 0; i < messageCnt; i++) {
-            producer.newMessage(txn).value(("Hello Txn - " + 
i).getBytes(UTF_8)).sendAsync();
-        }
+    public void noBatchProduceCommitTest() throws Exception {
+        produceCommitTest(false);
+    }
 
+    private void produceCommitTest(boolean enableBatch) throws Exception {
         @Cleanup
         MultiTopicsConsumerImpl<byte[]> consumer = 
(MultiTopicsConsumerImpl<byte[]>) pulsarClient
                 .newConsumer()
                 .topic(TOPIC_OUTPUT)
-                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscriptionName("test")
                 .enableBatchIndexAcknowledgment(true)
                 .subscribe();
 
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient
+                .newProducer()
+                .topic(TOPIC_OUTPUT)
+                .enableBatching(enableBatch)
+                .sendTimeout(0, TimeUnit.SECONDS);
+        if (enableBatch) {
+            producerBuilder.batcherBuilder(BatcherBuilder.KEY_BASED);
+        }
+        @Cleanup
+        PartitionedProducerImpl<byte[]> producer = 
(PartitionedProducerImpl<byte[]>) producerBuilder.create();
+
+        Transaction txn1 = getTxn();
+        Transaction txn2 = getTxn();
+
+        int messageCnt = 20;
+        for (int i = 0; i < messageCnt; i++) {
+            if (i % 2 == 0) {
+                producer.newMessage(txn1).value(("Hello Txn - " + 
i).getBytes(UTF_8)).sendAsync();
+            } else {
+                producer.newMessage(txn2).value(("Hello Txn - " + 
i).getBytes(UTF_8)).sendAsync();
+            }
+        }
+
         // Can't receive transaction messages before commit.
         Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
         Assert.assertNull(message);
 
-        txn.commit().get();
+        txn1.commit().get();
 
+        // txn1 messages could be received after txn1 committed
         int receiveCnt = 0;
-        Set<MessageId> firstReceivedMessageIdList = Sets.newHashSet();
-        for (int i = 0; i < messageCnt; i++) {
-            message = consumer.receive(5, TimeUnit.SECONDS);
+        for (int i = 0; i < messageCnt / 2; i++) {
+            message = consumer.receive();
             Assert.assertNotNull(message);
-            firstReceivedMessageIdList.add(message.getMessageId());
             log.info("receive msgId: {}, msg: {}", message.getMessageId(), new 
String(message.getData(), UTF_8));
             receiveCnt ++;
         }
-        Assert.assertEquals(messageCnt, receiveCnt);
+        Assert.assertEquals(messageCnt / 2, receiveCnt);
 
-        consumer.redeliverUnacknowledgedMessages();
+        message = consumer.receive(5, TimeUnit.SECONDS);
+        Assert.assertNull(message);
+
+        txn2.commit().get();
 
+        // txn2 messages could be received after txn2 committed
         receiveCnt = 0;
-        for (int i = 0; i < messageCnt; i++) {
-            message = consumer.receive(5, TimeUnit.SECONDS);
+        for (int i = 0; i < messageCnt / 2; i++) {
+            message = consumer.receive();
             Assert.assertNotNull(message);
-            
Assert.assertTrue(firstReceivedMessageIdList.remove(message.getMessageId()));
-            log.info("second receive msgId: {}, msg: {}", 
message.getMessageId(), new String(message.getData(), UTF_8));
+            log.info("receive second msgId: {}, msg: {}", 
message.getMessageId(), new String(message.getData(), UTF_8));
             receiveCnt ++;
         }
-        Assert.assertEquals(messageCnt, receiveCnt);
-        Assert.assertEquals(firstReceivedMessageIdList.size(), 0);
+        Assert.assertEquals(messageCnt / 2, receiveCnt);
 
-        log.info("receive transaction messages count: {}", receiveCnt);
+        message = consumer.receive(5, TimeUnit.SECONDS);
+        Assert.assertNull(message);
+
+        log.info("message commit test enableBatch {}", enableBatch);
     }
 
     @Test
-    public void partitionAbortTest() throws Exception {
+    public void produceAbortTest() throws Exception {
         Transaction txn = getTxn();
 
         @Cleanup
@@ -189,7 +210,7 @@ public class EndToEndTest extends TransactionTestBase {
     }
 
     @Test
-    public void batchDisableAndSharedSubTest() throws Exception {
+    public void txnAckTestNoBatchAndSharedSub() throws Exception {
         txnAckTest(false, 1, SubscriptionType.Shared);
     }
 
@@ -268,6 +289,93 @@ public class EndToEndTest extends TransactionTestBase {
         }
     }
 
+    @Test
+    public void txnMessageAckTest() throws Exception {
+        final String subName = "test";
+        @Cleanup
+        MultiTopicsConsumerImpl<byte[]> consumer = 
(MultiTopicsConsumerImpl<byte[]>) pulsarClient
+                .newConsumer()
+                .topic(TOPIC_OUTPUT)
+                .subscriptionName(subName)
+                .enableBatchIndexAcknowledgment(true)
+                .acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS)
+                .subscribe();
+
+        @Cleanup
+        PartitionedProducerImpl<byte[]> producer = 
(PartitionedProducerImpl<byte[]>) pulsarClient
+                .newProducer()
+                .topic(TOPIC_OUTPUT)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .enableBatching(false)
+                .create();
+
+        Transaction txn = getTxn();
+
+        int messageCnt = 10;
+        for (int i = 0; i < messageCnt; i++) {
+            producer.newMessage(txn).value(("Hello Txn - " + 
i).getBytes(UTF_8)).sendAsync();
+        }
+        log.info("produce transaction messages finished");
+
+        // Can't receive transaction messages before commit.
+        Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+        Assert.assertNull(message);
+        log.info("transaction messages can't be received before transaction 
committed");
+
+        txn.commit().get();
+
+        Map<Integer, MessageIdImpl> messageIdMap = new HashMap<>();
+        int ackedMessageCount = 0;
+        int receiveCnt = 0;
+        for (int i = 0; i < messageCnt; i++) {
+            message = consumer.receive();
+            if (i % 2 == 0) {
+                consumer.acknowledge(message);
+                ackedMessageCount ++;
+            }
+            Assert.assertNotNull(message);
+            receiveCnt ++;
+
+            MessageIdImpl messageId;
+            if (message.getMessageId() instanceof TopicMessageIdImpl) {
+                messageId = (MessageIdImpl) ((TopicMessageIdImpl) 
message.getMessageId()).getInnerMessageId();
+            } else {
+                messageId = (MessageIdImpl) message.getMessageId();
+            }
+            messageIdMap.put(messageId.getPartitionIndex(), messageId);
+        }
+        Assert.assertEquals(messageCnt, receiveCnt);
+
+        for (int i = 0; i < TOPIC_PARTITION; i++) {
+            Assert.assertEquals(
+                    messageIdMap.get(i).getLedgerId() + ":-1",
+                    getMarkDeletePosition(TOPIC_OUTPUT, i, subName));
+        }
+
+        consumer.redeliverUnacknowledgedMessages();
+
+        receiveCnt = 0;
+        for (int i = 0; i < messageCnt - ackedMessageCount; i++) {
+            message = consumer.receive(2, TimeUnit.SECONDS);
+            log.info("second receive messageId: {}", message.getMessageId());
+            Assert.assertNotNull(message);
+            consumer.acknowledge(message);
+            receiveCnt ++;
+        }
+        Assert.assertEquals(messageCnt - ackedMessageCount, receiveCnt);
+
+        message = consumer.receive(2, TimeUnit.SECONDS);
+        Assert.assertNull(message);
+
+        for (int i = 0; i < TOPIC_PARTITION; i++) {
+            Assert.assertEquals(
+                    messageIdMap.get(i).getLedgerId() + ":" + 
messageIdMap.get(i).getEntryId(),
+                    getMarkDeletePosition(TOPIC_OUTPUT, i, subName));
+        }
+
+        log.info("receive transaction messages count: {}", receiveCnt);
+    }
+
     private Transaction getTxn() throws Exception {
         return ((PulsarClientImpl) pulsarClient)
                 .newTransaction()
@@ -276,4 +384,10 @@ public class EndToEndTest extends TransactionTestBase {
                 .get();
     }
 
+    private String getMarkDeletePosition(String topic, Integer partition, 
String subName) throws Exception {
+        topic = TopicName.get(topic).getPartition(partition).toString();
+        PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topic);
+        return stats.cursors.get(subName).markDeletePosition;
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a327c4d..bbcd83b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1150,7 +1150,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     msgMetadata.recycle();
                     return;
                 }
-                msgId = new BatchMessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(), getPartitionIndex(), messageId.getBatchIndex(), -1, 
BatchMessageAckerDisabled.INSTANCE);
+                BatchMessageAcker batchMessageAcker = 
BatchMessageAcker.newAcker(ackBitSet);
+                msgId = new BatchMessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(), getPartitionIndex(), messageId.getBatchIndex(), -1, 
batchMessageAcker);
             }
 
             final MessageImpl<T> message = new 
MessageImpl<>(topicName.toString(), msgId, msgMetadata,

Reply via email to