This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e6396bbe344 [fix][txn] Transaction cumulative ack redeliver change 
(#14371)
e6396bbe344 is described below

commit e6396bbe344867f8821e3f5975d385cd4e5b5251
Author: congbo <[email protected]>
AuthorDate: Wed Jul 13 14:33:19 2022 +0800

    [fix][txn] Transaction cumulative ack redeliver change (#14371)
    
    https://github.com/apache/pulsar/pull/10478
    
    ### Motivation
    since #10478 merged, we should change the cumulative ack with transaction 
abort redeliver logic. We can't redeliver unCumulativeAck message by the server 
because the client will receive the new message and ack then will receive the 
old message they abort.
    
    in this case:
    1. we have 5 message
    2. cumulative ack 3 messages with the transaction
    3. we abort this transaction
    4. server redeliver message by the current consumer_epoch
    5. the client will not filter the 4 or 5 messages, because in #10478 we 
don't change the client consumer epoch
    6. client cumulative ack 4 5 with transaction and commit will lose the 1 2 
3 messages and the consume message, not in order.
    ### Modifications
    don't redeliver any cumulative ack messages, it will do by user self
---
 .../pendingack/impl/PendingAckHandleImpl.java      |  6 +-
 .../client/impl/TransactionEndToEndTest.java       | 69 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    | 15 -----
 .../client/impl/transaction/TransactionImpl.java   | 25 +-------
 4 files changed, 74 insertions(+), 41 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 1a159974700..c74fb3e9217 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.transaction.pendingack.impl;
 import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
 import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
 import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
-import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -528,9 +527,10 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
                         if (cumulativeAckOfTransaction.getKey().equals(txnId)) 
{
                             cumulativeAckOfTransaction = null;
                         }
-                        //TODO: pendingAck handle next pr will fix
-                        
persistentSubscription.redeliverUnacknowledgedMessages(consumer, 
DEFAULT_CONSUMER_EPOCH);
                         abortFuture.complete(null);
+
+                        // in cumulative ack with transaction, don't depend on 
server redeliver message,
+                        // it will cause the messages to be out of order
                     }).exceptionally(e -> {
                         log.error("[{}] Transaction pending ack store abort 
txnId : [{}] fail!",
                                 topicName, txnId, e);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 3a40982fed4..b372f0b61c5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import io.netty.channel.ChannelHandlerContext;
@@ -1066,6 +1067,74 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         }
     }
 
+    @Test
+    public void testCumulativeAckRedeliverMessages() throws Exception {
+        String topic = NAMESPACE1 + "/testCumulativeAckRedeliverMessages";
+
+        int count = 5;
+        int transactionCumulativeAck = 3;
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("test")
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        // send 5 messages
+        for (int i = 0; i < count; i++) {
+            producer.send((i + "").getBytes(UTF_8));
+        }
+
+        Transaction transaction = getTxn();
+        Transaction invalidTransaction = getTxn();
+
+        Message<byte[]> message = null;
+        for (int i = 0; i < transactionCumulativeAck; i++) {
+            message = consumer.receive();
+        }
+
+        // receive transaction in order
+        assertEquals(message.getValue(), (transactionCumulativeAck - 1 + 
"").getBytes(UTF_8));
+
+        // ack the last message
+        consumer.acknowledgeCumulativeAsync(message.getMessageId(), 
transaction).get();
+
+        // another ack will throw TransactionConflictException
+        try {
+            consumer.acknowledgeCumulativeAsync(message.getMessageId(), 
invalidTransaction).get();
+            fail();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof 
PulsarClientException.TransactionConflictException);
+            // abort transaction then redeliver messages
+            transaction.abort().get();
+            // consumer redeliver messages
+            consumer.redeliverUnacknowledgedMessages();
+        }
+
+        // receive the rest of the message
+        for (int i = 0; i < count; i++) {
+            message = consumer.receive();
+        }
+
+        Transaction commitTransaction = getTxn();
+
+        // receive the first message
+        assertEquals(message.getValue(), (count - 1 + "").getBytes(UTF_8));
+        // ack the end of the message
+        consumer.acknowledgeCumulativeAsync(message.getMessageId(), 
commitTransaction).get();
+
+        commitTransaction.commit().get();
+
+        // then redeliver will not receive any message
+        message = consumer.receive(3, TimeUnit.SECONDS);
+        assertNull(message);
+    }
+
     @Test
     public void testSendTxnMessageTimeout() throws Exception {
         String topic = NAMESPACE1 + "/testSendTxnMessageTimeout";
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index aeb1b9ed7db..4a60dad8bfc 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -626,14 +626,6 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
                                                            TransactionImpl 
txn) {
         CompletableFuture<Void> ackFuture;
         if (txn != null && this instanceof ConsumerImpl) {
-
-            // it is okay that we register acked topic after sending the 
acknowledgements. because
-            // the transactional ack will not be visiable for consumers until 
the transaction is
-            // committed
-            if (ackType == AckType.Cumulative) {
-                txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
-            }
-
             ackFuture = txn.registerAckedTopic(getTopic(), subscription)
                     .thenCompose(ignored -> doAcknowledge(messageIdList, 
ackType, properties, txn));
             // register the ackFuture as part of the transaction
@@ -649,13 +641,6 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
                                                            TransactionImpl 
txn) {
         CompletableFuture<Void> ackFuture;
         if (txn != null && (this instanceof ConsumerImpl)) {
-            // it is okay that we register acked topic after sending the 
acknowledgements. because
-            // the transactional ack will not be visiable for consumers until 
the transaction is
-            // committed
-            if (ackType == AckType.Cumulative) {
-                txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
-            }
-
             ackFuture = txn.registerAckedTopic(getTopic(), subscription)
                     .thenCompose(ignored -> doAcknowledge(messageId, ackType, 
properties, txn));
             // register the ackFuture as part of the transaction
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index aa7a18047de..55b20438693 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -37,7 +36,6 @@ import org.apache.pulsar.client.api.transaction.Transaction;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
 import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 
@@ -62,7 +60,6 @@ public class TransactionImpl implements Transaction , 
TimerTask {
     private final Map<String, CompletableFuture<Void>> registerPartitionMap;
     private final Map<Pair<String, String>, CompletableFuture<Void>> 
registerSubscriptionMap;
     private final TransactionCoordinatorClientImpl tcClient;
-    private Map<ConsumerImpl<?>, Integer> cumulativeAckConsumers;
 
     private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
     private final ArrayList<CompletableFuture<Void>> ackFutureList;
@@ -122,9 +119,8 @@ public class TransactionImpl implements Transaction , 
TimerTask {
                     }
                 });
             }
-        } else {
-            return completableFuture;
         }
+        return completableFuture;
     }
 
     public synchronized void registerSendOp(CompletableFuture<MessageId> 
sendFuture) {
@@ -147,22 +143,14 @@ public class TransactionImpl implements Transaction , 
TimerTask {
                     }
                 });
             }
-        } else {
-            return completableFuture;
         }
+        return completableFuture;
     }
 
     public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) {
         ackFutureList.add(ackFuture);
     }
 
-    public synchronized void registerCumulativeAckConsumer(ConsumerImpl<?> 
consumer) {
-        if (this.cumulativeAckConsumers == null) {
-            this.cumulativeAckConsumers = new HashMap<>();
-        }
-        cumulativeAckConsumers.put(consumer, 0);
-    }
-
     @Override
     public CompletableFuture<Void> commit() {
         timeout.cancel();
@@ -202,16 +190,7 @@ public class TransactionImpl implements Transaction , 
TimerTask {
                 if (e != null) {
                     log.error(e.getMessage());
                 }
-                if (cumulativeAckConsumers != null) {
-                    cumulativeAckConsumers.forEach((consumer, integer) ->
-                            cumulativeAckConsumers
-                                    .putIfAbsent(consumer, 
consumer.clearIncomingMessagesAndGetMessageNumber()));
-                }
                 tcClient.abortAsync(new TxnID(txnIdMostBits, 
txnIdLeastBits)).whenComplete((vx, ex) -> {
-                    if (cumulativeAckConsumers != null) {
-                        
cumulativeAckConsumers.forEach(ConsumerImpl::increaseAvailablePermits);
-                        cumulativeAckConsumers.clear();
-                    }
 
                     if (ex != null) {
                         if (ex instanceof TransactionNotFoundException

Reply via email to