This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 77fca3e [Transaction] Fix transaction message ack (#8007)
77fca3e is described below
commit 77fca3e59044a723cc4a06b525db6a869330bae1
Author: ran <[email protected]>
AuthorDate: Wed Sep 16 17:47:03 2020 +0800
[Transaction] Fix transaction message ack (#8007)
### Motivation
The transaction message ack is not well.
### Modifications
Fix the transaction message ack.
---
.../PersistentDispatcherMultipleConsumers.java | 7 +
.../PersistentDispatcherSingleActiveConsumer.java | 7 +
.../service/persistent/TransactionReader.java | 32 ++--
.../impl/PersistentTransactionBufferReader.java | 2 +-
.../broker/transaction/TransactionConsumeTest.java | 18 +-
.../pulsar/client/transaction/EndToEndTest.java | 182 +++++++++++++++++----
.../apache/pulsar/client/impl/ConsumerImpl.java | 3 +-
7 files changed, 197 insertions(+), 54 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a053681..37c5d9d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -59,6 +59,7 @@ import
org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
+import
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -571,6 +572,12 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
// Notify the consumer only if all the messages were already
acknowledged
consumerList.forEach(Consumer::reachedEndOfTopic);
}
+ } else if (exception.getCause() instanceof
TransactionNotSealedException) {
+ waitTimeMillis = 1;
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Error reading transaction entries : {}, Read
Type {} - Retrying to read in {} seconds",
+ name, exception.getMessage(), readType, waitTimeMillis
/ 1000.0);
+ }
} else if (!(exception instanceof TooManyRequestsException)) {
log.error("[{}] Error reading entries at {} : {}, Read Type {} -
Retrying to read in {} seconds", name,
cursor.getReadPosition(), exception.getMessage(),
readType, waitTimeMillis / 1000.0);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 0e6d747..e71591e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -49,6 +49,7 @@ import
org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
+import
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
@@ -479,6 +480,12 @@ public final class
PersistentDispatcherSingleActiveConsumer extends AbstractDisp
// Notify the consumer only if all the messages were already
acknowledged
consumers.forEach(Consumer::reachedEndOfTopic);
}
+ } else if (exception.getCause() instanceof
TransactionNotSealedException) {
+ waitTimeMillis = 1;
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Error reading transaction entries : {}, -
Retrying to read in {} seconds", name,
+ exception.getMessage(), waitTimeMillis / 1000.0);
+ }
} else if (!(exception instanceof TooManyRequestsException)) {
log.error("[{}-{}] Error reading entries at {} : {} - Retrying to
read in {} seconds", name, c,
cursor.getReadPosition(), exception.getMessage(),
waitTimeMillis / 1000.0);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/TransactionReader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/TransactionReader.java
index b954506..d5455a0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/TransactionReader.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/TransactionReader.java
@@ -20,23 +20,21 @@ package org.apache.pulsar.broker.service.persistent;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import com.google.common.collect.Queues;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
-import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
-import org.apache.pulsar.broker.transaction.buffer.TransactionEntry;
+import
org.apache.pulsar.broker.transaction.buffer.exceptions.EndOfTransactionException;
+import
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.client.api.transaction.TxnID;
/**
@@ -112,6 +110,15 @@ public class TransactionReader {
}
transactionBufferReader.thenAccept(reader -> {
reader.readNext(readMessageNum).whenComplete((transactionEntries,
throwable) -> {
+ if (throwable != null && throwable.getCause() instanceof
EndOfTransactionException) {
+ if (log.isDebugEnabled()) {
+ log.debug("transaction {} read finished.", txnID);
+ }
+ resetReader(txnID, reader);
+
readEntriesCallback.readEntriesComplete(Collections.EMPTY_LIST, ctx);
+ return;
+ }
+
if (throwable != null) {
log.error("Read transaction messages failed.", throwable);
readEntriesCallback.readEntriesFailed(
@@ -131,15 +138,20 @@ public class TransactionReader {
transactionEntries.get(0).committedAtEntryId()),
transactionEntries.get(0).numMessageInTxn());
- if (transactionEntries.size() < readMessageNum) {
- resetReader(txnID, reader);
- }
readEntriesCallback.readEntriesComplete(new
ArrayList<>(transactionEntries), ctx);
});
}).exceptionally(throwable -> {
- log.error("Open transactionBufferReader failed.", throwable);
- readEntriesCallback.readEntriesFailed(
-
ManagedLedgerException.getManagedLedgerException(throwable), ctx);
+ transactionBufferReader = null;
+ if (throwable.getCause() instanceof TransactionNotSealedException)
{
+ if (log.isDebugEnabled()) {
+ log.debug("transaction {} is not sealed, failed to open
transactionBufferReader.", txnID);
+ }
+ readEntriesCallback.readEntriesFailed(
+
ManagedLedgerException.getManagedLedgerException(throwable.getCause()), ctx);
+ return null;
+ }
+ log.error("open transactionBufferReader failed.", throwable);
+
readEntriesCallback.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(throwable),
ctx);
return null;
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBufferReader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBufferReader.java
index 0f1ab70..6956cfb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBufferReader.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBufferReader.java
@@ -56,7 +56,7 @@ public class PersistentTransactionBufferReader implements
TransactionBufferReade
PersistentTransactionBufferReader(TransactionMeta meta, ManagedLedger
ledger)
throws TransactionNotSealedException {
- if (TxnStatus.OPEN == meta.status()) {
+ if (TxnStatus.COMMITTED != meta.status()) {
throw new TransactionNotSealedException("Transaction `" +
meta.id() + "` is not sealed yet");
}
this.meta = meta;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
index ac70225..b2f3221 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
@@ -114,10 +114,11 @@ public class TransactionConsumeTest extends
TransactionTestBase {
appendTransactionMessages(txnID, transactionBuffer,
transactionMessageCnt);
sendNormalMessages(producer, messageCntBeforeTxn, messageCntAfterTxn);
+ Message<byte[]> message;
for (int i = 0; i < totalMsgCnt; i++) {
if (i < (messageCntBeforeTxn + messageCntAfterTxn)) {
// receive normal messages successfully
- Message<byte[]> message = exclusiveConsumer.receive(2,
TimeUnit.SECONDS);
+ message = exclusiveConsumer.receive(2, TimeUnit.SECONDS);
Assert.assertNotNull(message);
log.info("Receive exclusive normal msg: {}" + new
String(message.getData(), UTF_8));
message = sharedConsumer.receive(2, TimeUnit.SECONDS);
@@ -125,33 +126,34 @@ public class TransactionConsumeTest extends
TransactionTestBase {
log.info("Receive shared normal msg: {}" + new
String(message.getData(), UTF_8));
} else {
// can't receive transaction messages before commit
- Message<byte[]> message = exclusiveConsumer.receive(2,
TimeUnit.SECONDS);
+ message = exclusiveConsumer.receive(2, TimeUnit.SECONDS);
Assert.assertNull(message);
+ log.info("exclusive consumer can't receive message before
commit.");
+
message = sharedConsumer.receive(2, TimeUnit.SECONDS);
Assert.assertNull(message);
- log.info("Can't receive message before commit.");
+ log.info("shared consumer can't receive message before
commit.");
}
}
- transactionBuffer.endTxnOnPartition(txnID,
PulsarApi.TxnAction.COMMIT.getNumber());
- Thread.sleep(1000);
+ transactionBuffer.endTxnOnPartition(txnID,
PulsarApi.TxnAction.COMMIT.getNumber()).get();
log.info("Commit txn.");
Map<String, Integer> exclusiveBatchIndexMap = new HashMap<>();
Map<String, Integer> sharedBatchIndexMap = new HashMap<>();
// receive transaction messages successfully after commit
for (int i = 0; i < transactionMessageCnt; i++) {
- Message<byte[]> message = exclusiveConsumer.receive(2,
TimeUnit.SECONDS);
+ message = exclusiveConsumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
Assert.assertTrue(message.getMessageId() instanceof
BatchMessageIdImpl);
checkBatchIndex(exclusiveBatchIndexMap, (BatchMessageIdImpl)
message.getMessageId());
log.info("Receive txn exclusive id: {}, msg: {}",
message.getMessageId(), new String(message.getData()));
- message = sharedConsumer.receive(2, TimeUnit.SECONDS);
+ message = sharedConsumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
Assert.assertTrue(message.getMessageId() instanceof
BatchMessageIdImpl);
checkBatchIndex(sharedBatchIndexMap, (BatchMessageIdImpl)
message.getMessageId());
- log.info("Receive txn shared id: {}, msg: {}",
message.getMessageId(), new String(message.getData(), UTF_8));
+ log.info("Receive txn shared id: {}, msg: {}",
message.getMessageId(), new String(message.getData()));
}
log.info("TransactionConsumeTest noSortedTest finish.");
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
index 75f3bae..47f0ad5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
@@ -21,32 +21,38 @@ package org.apache.pulsar.client.transaction;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.collect.Sets;
-import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+
/**
* End to end transaction test.
*/
@@ -91,66 +97,81 @@ public class EndToEndTest extends TransactionTestBase {
}
@Test
- public void partitionCommitTest() throws Exception {
- Transaction txn = getTxn();
-
- @Cleanup
- PartitionedProducerImpl<byte[]> producer =
(PartitionedProducerImpl<byte[]>) pulsarClient
- .newProducer()
- .topic(TOPIC_OUTPUT)
- .sendTimeout(0, TimeUnit.SECONDS)
- .enableBatching(false)
- .create();
-
- int messageCnt = 10;
- for (int i = 0; i < messageCnt; i++) {
- producer.newMessage(txn).value(("Hello Txn - " +
i).getBytes(UTF_8)).sendAsync();
- }
+ public void noBatchProduceCommitTest() throws Exception {
+ produceCommitTest(false);
+ }
+ private void produceCommitTest(boolean enableBatch) throws Exception {
@Cleanup
MultiTopicsConsumerImpl<byte[]> consumer =
(MultiTopicsConsumerImpl<byte[]>) pulsarClient
.newConsumer()
.topic(TOPIC_OUTPUT)
-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("test")
.enableBatchIndexAcknowledgment(true)
.subscribe();
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient
+ .newProducer()
+ .topic(TOPIC_OUTPUT)
+ .enableBatching(enableBatch)
+ .sendTimeout(0, TimeUnit.SECONDS);
+ if (enableBatch) {
+ producerBuilder.batcherBuilder(BatcherBuilder.KEY_BASED);
+ }
+ @Cleanup
+ PartitionedProducerImpl<byte[]> producer =
(PartitionedProducerImpl<byte[]>) producerBuilder.create();
+
+ Transaction txn1 = getTxn();
+ Transaction txn2 = getTxn();
+
+ int messageCnt = 20;
+ for (int i = 0; i < messageCnt; i++) {
+ if (i % 2 == 0) {
+ producer.newMessage(txn1).value(("Hello Txn - " +
i).getBytes(UTF_8)).sendAsync();
+ } else {
+ producer.newMessage(txn2).value(("Hello Txn - " +
i).getBytes(UTF_8)).sendAsync();
+ }
+ }
+
// Can't receive transaction messages before commit.
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(message);
- txn.commit().get();
+ txn1.commit().get();
+ // txn1 messages could be received after txn1 committed
int receiveCnt = 0;
- Set<MessageId> firstReceivedMessageIdList = Sets.newHashSet();
- for (int i = 0; i < messageCnt; i++) {
- message = consumer.receive(5, TimeUnit.SECONDS);
+ for (int i = 0; i < messageCnt / 2; i++) {
+ message = consumer.receive();
Assert.assertNotNull(message);
- firstReceivedMessageIdList.add(message.getMessageId());
log.info("receive msgId: {}, msg: {}", message.getMessageId(), new
String(message.getData(), UTF_8));
receiveCnt ++;
}
- Assert.assertEquals(messageCnt, receiveCnt);
+ Assert.assertEquals(messageCnt / 2, receiveCnt);
- consumer.redeliverUnacknowledgedMessages();
+ message = consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNull(message);
+
+ txn2.commit().get();
+ // txn2 messages could be received after txn2 committed
receiveCnt = 0;
- for (int i = 0; i < messageCnt; i++) {
- message = consumer.receive(5, TimeUnit.SECONDS);
+ for (int i = 0; i < messageCnt / 2; i++) {
+ message = consumer.receive();
Assert.assertNotNull(message);
-
Assert.assertTrue(firstReceivedMessageIdList.remove(message.getMessageId()));
- log.info("second receive msgId: {}, msg: {}",
message.getMessageId(), new String(message.getData(), UTF_8));
+ log.info("receive second msgId: {}, msg: {}",
message.getMessageId(), new String(message.getData(), UTF_8));
receiveCnt ++;
}
- Assert.assertEquals(messageCnt, receiveCnt);
- Assert.assertEquals(firstReceivedMessageIdList.size(), 0);
+ Assert.assertEquals(messageCnt / 2, receiveCnt);
- log.info("receive transaction messages count: {}", receiveCnt);
+ message = consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNull(message);
+
+ log.info("message commit test enableBatch {}", enableBatch);
}
@Test
- public void partitionAbortTest() throws Exception {
+ public void produceAbortTest() throws Exception {
Transaction txn = getTxn();
@Cleanup
@@ -189,7 +210,7 @@ public class EndToEndTest extends TransactionTestBase {
}
@Test
- public void batchDisableAndSharedSubTest() throws Exception {
+ public void txnAckTestNoBatchAndSharedSub() throws Exception {
txnAckTest(false, 1, SubscriptionType.Shared);
}
@@ -268,6 +289,93 @@ public class EndToEndTest extends TransactionTestBase {
}
}
+ @Test
+ public void txnMessageAckTest() throws Exception {
+ final String subName = "test";
+ @Cleanup
+ MultiTopicsConsumerImpl<byte[]> consumer =
(MultiTopicsConsumerImpl<byte[]>) pulsarClient
+ .newConsumer()
+ .topic(TOPIC_OUTPUT)
+ .subscriptionName(subName)
+ .enableBatchIndexAcknowledgment(true)
+ .acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS)
+ .subscribe();
+
+ @Cleanup
+ PartitionedProducerImpl<byte[]> producer =
(PartitionedProducerImpl<byte[]>) pulsarClient
+ .newProducer()
+ .topic(TOPIC_OUTPUT)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .enableBatching(false)
+ .create();
+
+ Transaction txn = getTxn();
+
+ int messageCnt = 10;
+ for (int i = 0; i < messageCnt; i++) {
+ producer.newMessage(txn).value(("Hello Txn - " +
i).getBytes(UTF_8)).sendAsync();
+ }
+ log.info("produce transaction messages finished");
+
+ // Can't receive transaction messages before commit.
+ Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNull(message);
+ log.info("transaction messages can't be received before transaction
committed");
+
+ txn.commit().get();
+
+ Map<Integer, MessageIdImpl> messageIdMap = new HashMap<>();
+ int ackedMessageCount = 0;
+ int receiveCnt = 0;
+ for (int i = 0; i < messageCnt; i++) {
+ message = consumer.receive();
+ if (i % 2 == 0) {
+ consumer.acknowledge(message);
+ ackedMessageCount ++;
+ }
+ Assert.assertNotNull(message);
+ receiveCnt ++;
+
+ MessageIdImpl messageId;
+ if (message.getMessageId() instanceof TopicMessageIdImpl) {
+ messageId = (MessageIdImpl) ((TopicMessageIdImpl)
message.getMessageId()).getInnerMessageId();
+ } else {
+ messageId = (MessageIdImpl) message.getMessageId();
+ }
+ messageIdMap.put(messageId.getPartitionIndex(), messageId);
+ }
+ Assert.assertEquals(messageCnt, receiveCnt);
+
+ for (int i = 0; i < TOPIC_PARTITION; i++) {
+ Assert.assertEquals(
+ messageIdMap.get(i).getLedgerId() + ":-1",
+ getMarkDeletePosition(TOPIC_OUTPUT, i, subName));
+ }
+
+ consumer.redeliverUnacknowledgedMessages();
+
+ receiveCnt = 0;
+ for (int i = 0; i < messageCnt - ackedMessageCount; i++) {
+ message = consumer.receive(2, TimeUnit.SECONDS);
+ log.info("second receive messageId: {}", message.getMessageId());
+ Assert.assertNotNull(message);
+ consumer.acknowledge(message);
+ receiveCnt ++;
+ }
+ Assert.assertEquals(messageCnt - ackedMessageCount, receiveCnt);
+
+ message = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertNull(message);
+
+ for (int i = 0; i < TOPIC_PARTITION; i++) {
+ Assert.assertEquals(
+ messageIdMap.get(i).getLedgerId() + ":" +
messageIdMap.get(i).getEntryId(),
+ getMarkDeletePosition(TOPIC_OUTPUT, i, subName));
+ }
+
+ log.info("receive transaction messages count: {}", receiveCnt);
+ }
+
private Transaction getTxn() throws Exception {
return ((PulsarClientImpl) pulsarClient)
.newTransaction()
@@ -276,4 +384,10 @@ public class EndToEndTest extends TransactionTestBase {
.get();
}
+ private String getMarkDeletePosition(String topic, Integer partition,
String subName) throws Exception {
+ topic = TopicName.get(topic).getPartition(partition).toString();
+ PersistentTopicInternalStats stats =
admin.topics().getInternalStats(topic);
+ return stats.cursors.get(subName).markDeletePosition;
+ }
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a327c4d..bbcd83b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1150,7 +1150,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
msgMetadata.recycle();
return;
}
- msgId = new BatchMessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), getPartitionIndex(), messageId.getBatchIndex(), -1,
BatchMessageAckerDisabled.INSTANCE);
+ BatchMessageAcker batchMessageAcker =
BatchMessageAcker.newAcker(ackBitSet);
+ msgId = new BatchMessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), getPartitionIndex(), messageId.getBatchIndex(), -1,
batchMessageAcker);
}
final MessageImpl<T> message = new
MessageImpl<>(topicName.toString(), msgId, msgMetadata,