This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 9ccb3ac2f09 [fix][txn] Ack all message ids when ack chunk messages
with transaction. (#21268)
9ccb3ac2f09 is described below
commit 9ccb3ac2f0902d373640a433f2eb91bfb4153e48
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Nov 8 11:51:03 2023 +0800
[fix][txn] Ack all message ids when ack chunk messages with transaction.
(#21268)
Now, only the last chunk will be acknowledged when acknowledging chunk
messages with transactions.
If the messageId is a `ChunkMessageIdImpl`, the ledger ID and entry ID will
belong to the `lastChunkMsgId`.
https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2791-L2814
https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java#L30-L33
Flow the common message acknowledge logic, ack all the chunks when
acknowledging messages with transactions.
(cherry picked from commit f5814176efc08dc05553cf1059b3d55f6bcb2b6b)
---
.../broker/transaction/TransactionConsumeTest.java | 50 +++++++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 101 ++++++++++++++++++---
2 files changed, 137 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
index 6ff3053e2bb..5fb3e40d104 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
@@ -19,7 +19,8 @@
package org.apache.pulsar.broker.transaction;
import static java.nio.charset.StandardCharsets.UTF_8;
-
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -29,6 +30,8 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -39,12 +42,15 @@ import
org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.ChunkMessageIdImpl;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.TxnAction;
@@ -367,4 +373,46 @@ public class TransactionConsumeTest extends
TransactionTestBase {
return positionList;
}
+ @Test
+ public void testAckChunkMessage() throws Exception {
+ String producerName = "test-producer";
+ String subName = "testAckChunkMessage";
+ @Cleanup
+ PulsarClient pulsarClient1 =
PulsarClient.builder().serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+ .enableTransaction(true).build();
+ @Cleanup
+ Producer<String> producer = pulsarClient1
+ .newProducer(Schema.STRING)
+ .producerName(producerName)
+ .topic(CONSUME_TOPIC)
+ .enableChunking(true)
+ .enableBatching(false)
+ .create();
+ Consumer<String> consumer = pulsarClient1
+ .newConsumer(Schema.STRING)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .topic(CONSUME_TOPIC)
+ .subscriptionName(subName)
+ .subscribe();
+
+ int messageSize = 6000; // payload size in KB
+ String message = Stream.generate(() -> "a").limit(messageSize *
1000).collect(Collectors.joining());
+
+ MessageId messageId = producer.newMessage().value(message).send();
+ assertTrue(messageId instanceof ChunkMessageIdImpl);
+ assertNotEquals(((ChunkMessageIdImpl)
messageId).getLastChunkMessageId(),
+ ((ChunkMessageIdImpl) messageId).getFirstChunkMessageId());
+
+ Transaction transaction = pulsarClient1.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.HOURS)
+ .build()
+ .get();
+
+ Message<String> msg = consumer.receive();
+ consumer.acknowledgeAsync(msg.getMessageId());
+ transaction.commit().get();
+
+
Assert.assertEquals(admin.topics().getStats(CONSUME_TOPIC).getSubscriptions().get(subName)
+ .getBacklogSize(), 0);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 62cd67d321f..45ecb34f15d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
+import static org.apache.pulsar.common.protocol.Commands.serializeWithSize;
import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Iterables;
@@ -31,12 +32,14 @@ import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
+import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -62,6 +65,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
@@ -86,7 +90,9 @@ import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
+import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
+import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandAck.ValidationError;
import org.apache.pulsar.common.api.proto.CommandMessage;
@@ -108,6 +114,7 @@ import
org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
@@ -2772,15 +2779,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
ValidationError validationError,
Map<String, Long> properties, TxnID txnID) {
BitSetRecyclable bitSetRecyclable = null;
- long ledgerId;
- long entryId;
- ByteBuf cmd;
+ final MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
+ final long ledgerId = messageIdImpl.getLedgerId();
+ final long entryId = messageIdImpl.getEntryId();
+ final List<ByteBuf> cmdList;
long requestId = client.newRequestId();
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
bitSetRecyclable = BitSetRecyclable.create();
- ledgerId = batchMessageId.getLedgerId();
- entryId = batchMessageId.getEntryId();
if (ackType == AckType.Cumulative) {
batchMessageId.ackCumulative();
bitSetRecyclable.set(0, batchMessageId.getBatchSize());
@@ -2789,15 +2795,37 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
bitSetRecyclable.set(0, batchMessageId.getBatchSize());
bitSetRecyclable.clear(batchMessageId.getBatchIndex());
}
- cmd = Commands.newAck(consumerId, ledgerId, entryId,
bitSetRecyclable, ackType, validationError, properties,
- txnID.getLeastSigBits(), txnID.getMostSigBits(),
requestId, batchMessageId.getBatchSize());
+ cmdList = Collections.singletonList(Commands.newAck(consumerId,
ledgerId, entryId, bitSetRecyclable,
+ ackType, validationError, properties,
txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId,
+ batchMessageId.getBatchSize()));
bitSetRecyclable.recycle();
} else {
- MessageIdImpl singleMessage = (MessageIdImpl) messageId;
- ledgerId = singleMessage.getLedgerId();
- entryId = singleMessage.getEntryId();
- cmd = Commands.newAck(consumerId, ledgerId, entryId,
bitSetRecyclable, ackType,
- validationError, properties, txnID.getLeastSigBits(),
txnID.getMostSigBits(), requestId);
+ MessageIdImpl[] chunkMsgIds =
this.unAckedChunkedMessageIdSequenceMap.remove(messageIdImpl);
+ // cumulative ack chunk by the last messageId
+ if (chunkMsgIds == null || ackType == AckType.Cumulative) {
+ cmdList =
Collections.singletonList(Commands.newAck(consumerId, ledgerId, entryId, null,
ackType,
+ validationError, properties, txnID.getLeastSigBits(),
txnID.getMostSigBits(), requestId));
+ } else {
+ if (Commands.peerSupportsMultiMessageAcknowledgment(
+ getClientCnx().getRemoteEndpointProtocolVersion())) {
+ List<Triple<Long, Long, ConcurrentBitSetRecyclable>>
entriesToAck =
+ new ArrayList<>(chunkMsgIds.length);
+ for (MessageIdImpl cMsgId : chunkMsgIds) {
+ if (cMsgId != null && chunkMsgIds.length > 1) {
+ entriesToAck.add(Triple.of(cMsgId.getLedgerId(),
cMsgId.getEntryId(), null));
+ }
+ }
+ cmdList = Collections.singletonList(
+ newMultiTransactionMessageAck(consumerId, txnID,
entriesToAck, requestId));
+ } else {
+ cmdList = new ArrayList<>();
+ for (MessageIdImpl cMsgId : chunkMsgIds) {
+ cmdList.add(Commands.newAck(consumerId,
cMsgId.ledgerId, cMsgId.entryId, null, ackType,
+ validationError, properties,
+ txnID.getLeastSigBits(),
txnID.getMostSigBits(), requestId));
+ }
+ }
+ }
}
if (ackType == AckType.Cumulative) {
@@ -2811,8 +2839,55 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
.ConnectException("Failed to ack message [" + messageId +
"] "
+ "for transaction [" + txnID + "] due to consumer connect
fail, consumer state: " + getState()));
} else {
- return cnx.newAckForReceipt(cmd, requestId);
+ List<CompletableFuture<Void>> completableFutures = new
LinkedList<>();
+ cmdList.forEach(cmd ->
completableFutures.add(cnx.newAckForReceipt(cmd, requestId)));
+ return FutureUtil.waitForAll(completableFutures);
+ }
+ }
+
+ private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID,
+ List<Triple<Long, Long,
ConcurrentBitSetRecyclable>> entries,
+ long requestID) {
+ BaseCommand cmd = newMultiMessageAckCommon(entries);
+ cmd.getAck()
+ .setConsumerId(consumerId)
+ .setAckType(AckType.Individual)
+ .setTxnidLeastBits(txnID.getLeastSigBits())
+ .setTxnidMostBits(txnID.getMostSigBits())
+ .setRequestId(requestID);
+ return serializeWithSize(cmd);
+ }
+
+ private static final FastThreadLocal<BaseCommand> LOCAL_BASE_COMMAND = new
FastThreadLocal<BaseCommand>() {
+ @Override
+ protected BaseCommand initialValue() throws Exception {
+ return new BaseCommand();
+ }
+ };
+
+ private static BaseCommand newMultiMessageAckCommon(List<Triple<Long,
Long, ConcurrentBitSetRecyclable>> entries) {
+ BaseCommand cmd = LOCAL_BASE_COMMAND.get()
+ .clear()
+ .setType(BaseCommand.Type.ACK);
+ CommandAck ack = cmd.setAck();
+ int entriesCount = entries.size();
+ for (int i = 0; i < entriesCount; i++) {
+ long ledgerId = entries.get(i).getLeft();
+ long entryId = entries.get(i).getMiddle();
+ ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
+ MessageIdData msgId = ack.addMessageId()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId);
+ if (bitSet != null) {
+ long[] ackSet = bitSet.toLongArray();
+ for (int j = 0; j < ackSet.length; j++) {
+ msgId.addAckSet(ackSet[j]);
+ }
+ bitSet.recycle();
+ }
}
+
+ return cmd;
}
public Map<MessageIdImpl, List<MessageImpl<T>>>
getPossibleSendToDeadLetterTopicMessages() {