This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 9ccb3ac2f09 [fix][txn] Ack all message ids when ack chunk messages 
with transaction. (#21268)
9ccb3ac2f09 is described below

commit 9ccb3ac2f0902d373640a433f2eb91bfb4153e48
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Nov 8 11:51:03 2023 +0800

    [fix][txn] Ack all message ids when ack chunk messages with transaction. 
(#21268)
    
    Now, only the last chunk will be acknowledged when acknowledging chunk 
messages with transactions.
    If the messageId is a `ChunkMessageIdImpl`, the ledger ID and entry ID will 
belong to the `lastChunkMsgId`.
    
https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2791-L2814
    
    
https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java#L30-L33
    
    Flow the common message acknowledge logic, ack all the chunks when 
acknowledging messages with transactions.
    
    (cherry picked from commit f5814176efc08dc05553cf1059b3d55f6bcb2b6b)
---
 .../broker/transaction/TransactionConsumeTest.java |  50 +++++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 101 ++++++++++++++++++---
 2 files changed, 137 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
index 6ff3053e2bb..5fb3e40d104 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
@@ -19,7 +19,8 @@
 package org.apache.pulsar.broker.transaction;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -29,6 +30,8 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -39,12 +42,15 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.ChunkMessageIdImpl;
 import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.TxnAction;
@@ -367,4 +373,46 @@ public class TransactionConsumeTest extends 
TransactionTestBase {
         return positionList;
     }
 
+    @Test
+    public void testAckChunkMessage() throws Exception {
+        String producerName = "test-producer";
+        String subName = "testAckChunkMessage";
+        @Cleanup
+        PulsarClient pulsarClient1 = 
PulsarClient.builder().serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .enableTransaction(true).build();
+        @Cleanup
+        Producer<String> producer = pulsarClient1
+                .newProducer(Schema.STRING)
+                .producerName(producerName)
+                .topic(CONSUME_TOPIC)
+                .enableChunking(true)
+                .enableBatching(false)
+                .create();
+        Consumer<String> consumer = pulsarClient1
+                .newConsumer(Schema.STRING)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .topic(CONSUME_TOPIC)
+                .subscriptionName(subName)
+                .subscribe();
+
+        int messageSize = 6000; // payload size in KB
+        String message = Stream.generate(() -> "a").limit(messageSize * 
1000).collect(Collectors.joining());
+
+        MessageId messageId = producer.newMessage().value(message).send();
+        assertTrue(messageId instanceof ChunkMessageIdImpl);
+        assertNotEquals(((ChunkMessageIdImpl) 
messageId).getLastChunkMessageId(),
+                ((ChunkMessageIdImpl) messageId).getFirstChunkMessageId());
+
+        Transaction transaction = pulsarClient1.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.HOURS)
+                .build()
+                .get();
+
+        Message<String> msg = consumer.receive();
+        consumer.acknowledgeAsync(msg.getMessageId());
+        transaction.commit().get();
+
+        
Assert.assertEquals(admin.topics().getStats(CONSUME_TOPIC).getSubscriptions().get(subName)
+                .getBacklogSize(), 0);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 62cd67d321f..45ecb34f15d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
+import static org.apache.pulsar.common.protocol.Commands.serializeWithSize;
 import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Iterables;
@@ -31,12 +32,14 @@ import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.Timeout;
+import io.netty.util.concurrent.FastThreadLocal;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -62,6 +65,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
@@ -86,7 +90,9 @@ import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
+import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
+import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandAck.ValidationError;
 import org.apache.pulsar.common.api.proto.CommandMessage;
@@ -108,6 +114,7 @@ import 
org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.SafeCollectionUtils;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
@@ -2772,15 +2779,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                                                         
ValidationError validationError,
                                                                         
Map<String, Long> properties, TxnID txnID) {
         BitSetRecyclable bitSetRecyclable = null;
-        long ledgerId;
-        long entryId;
-        ByteBuf cmd;
+        final MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
+        final long ledgerId = messageIdImpl.getLedgerId();
+        final long entryId = messageIdImpl.getEntryId();
+        final List<ByteBuf> cmdList;
         long requestId = client.newRequestId();
         if (messageId instanceof BatchMessageIdImpl) {
             BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
             bitSetRecyclable = BitSetRecyclable.create();
-            ledgerId = batchMessageId.getLedgerId();
-            entryId = batchMessageId.getEntryId();
             if (ackType == AckType.Cumulative) {
                 batchMessageId.ackCumulative();
                 bitSetRecyclable.set(0, batchMessageId.getBatchSize());
@@ -2789,15 +2795,37 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 bitSetRecyclable.set(0, batchMessageId.getBatchSize());
                 bitSetRecyclable.clear(batchMessageId.getBatchIndex());
             }
-            cmd = Commands.newAck(consumerId, ledgerId, entryId, 
bitSetRecyclable, ackType, validationError, properties,
-                    txnID.getLeastSigBits(), txnID.getMostSigBits(), 
requestId, batchMessageId.getBatchSize());
+            cmdList = Collections.singletonList(Commands.newAck(consumerId, 
ledgerId, entryId, bitSetRecyclable,
+                    ackType, validationError, properties, 
txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId,
+                    batchMessageId.getBatchSize()));
             bitSetRecyclable.recycle();
         } else {
-            MessageIdImpl singleMessage = (MessageIdImpl) messageId;
-            ledgerId = singleMessage.getLedgerId();
-            entryId = singleMessage.getEntryId();
-            cmd = Commands.newAck(consumerId, ledgerId, entryId, 
bitSetRecyclable, ackType,
-                    validationError, properties, txnID.getLeastSigBits(), 
txnID.getMostSigBits(), requestId);
+            MessageIdImpl[] chunkMsgIds = 
this.unAckedChunkedMessageIdSequenceMap.remove(messageIdImpl);
+            // cumulative ack chunk by the last messageId
+            if (chunkMsgIds == null || ackType == AckType.Cumulative) {
+                cmdList = 
Collections.singletonList(Commands.newAck(consumerId, ledgerId, entryId, null, 
ackType,
+                        validationError, properties, txnID.getLeastSigBits(), 
txnID.getMostSigBits(), requestId));
+            } else {
+                if (Commands.peerSupportsMultiMessageAcknowledgment(
+                        getClientCnx().getRemoteEndpointProtocolVersion())) {
+                    List<Triple<Long, Long, ConcurrentBitSetRecyclable>> 
entriesToAck =
+                            new ArrayList<>(chunkMsgIds.length);
+                    for (MessageIdImpl cMsgId : chunkMsgIds) {
+                        if (cMsgId != null && chunkMsgIds.length > 1) {
+                            entriesToAck.add(Triple.of(cMsgId.getLedgerId(), 
cMsgId.getEntryId(), null));
+                        }
+                    }
+                    cmdList = Collections.singletonList(
+                            newMultiTransactionMessageAck(consumerId, txnID, 
entriesToAck, requestId));
+                } else {
+                    cmdList = new ArrayList<>();
+                    for (MessageIdImpl cMsgId : chunkMsgIds) {
+                        cmdList.add(Commands.newAck(consumerId, 
cMsgId.ledgerId, cMsgId.entryId, null, ackType,
+                                validationError, properties,
+                                txnID.getLeastSigBits(), 
txnID.getMostSigBits(), requestId));
+                    }
+                }
+            }
         }
 
         if (ackType == AckType.Cumulative) {
@@ -2811,8 +2839,55 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     .ConnectException("Failed to ack message [" + messageId + 
"] "
                     + "for transaction [" + txnID + "] due to consumer connect 
fail, consumer state: " + getState()));
         } else {
-            return cnx.newAckForReceipt(cmd, requestId);
+            List<CompletableFuture<Void>> completableFutures = new 
LinkedList<>();
+            cmdList.forEach(cmd -> 
completableFutures.add(cnx.newAckForReceipt(cmd, requestId)));
+            return FutureUtil.waitForAll(completableFutures);
+        }
+    }
+
+    private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID,
+                                                  List<Triple<Long, Long, 
ConcurrentBitSetRecyclable>> entries,
+                                                  long requestID) {
+        BaseCommand cmd = newMultiMessageAckCommon(entries);
+        cmd.getAck()
+                .setConsumerId(consumerId)
+                .setAckType(AckType.Individual)
+                .setTxnidLeastBits(txnID.getLeastSigBits())
+                .setTxnidMostBits(txnID.getMostSigBits())
+                .setRequestId(requestID);
+        return serializeWithSize(cmd);
+    }
+
+    private static final FastThreadLocal<BaseCommand> LOCAL_BASE_COMMAND = new 
FastThreadLocal<BaseCommand>() {
+        @Override
+        protected BaseCommand initialValue() throws Exception {
+            return new BaseCommand();
+        }
+    };
+
+    private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, 
Long, ConcurrentBitSetRecyclable>> entries) {
+        BaseCommand cmd = LOCAL_BASE_COMMAND.get()
+                .clear()
+                .setType(BaseCommand.Type.ACK);
+        CommandAck ack = cmd.setAck();
+        int entriesCount = entries.size();
+        for (int i = 0; i < entriesCount; i++) {
+            long ledgerId = entries.get(i).getLeft();
+            long entryId = entries.get(i).getMiddle();
+            ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
+            MessageIdData msgId = ack.addMessageId()
+                    .setLedgerId(ledgerId)
+                    .setEntryId(entryId);
+            if (bitSet != null) {
+                long[] ackSet = bitSet.toLongArray();
+                for (int j = 0; j < ackSet.length; j++) {
+                    msgId.addAckSet(ackSet[j]);
+                }
+                bitSet.recycle();
+            }
         }
+
+        return cmd;
     }
 
     public Map<MessageIdImpl, List<MessageImpl<T>>> 
getPossibleSendToDeadLetterTopicMessages() {

Reply via email to