This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e6396bbe344 [fix][txn] Transaction cumulative ack redeliver change
(#14371)
e6396bbe344 is described below
commit e6396bbe344867f8821e3f5975d385cd4e5b5251
Author: congbo <[email protected]>
AuthorDate: Wed Jul 13 14:33:19 2022 +0800
[fix][txn] Transaction cumulative ack redeliver change (#14371)
https://github.com/apache/pulsar/pull/10478
### Motivation
since #10478 merged, we should change the cumulative ack with transaction
abort redeliver logic. We can't redeliver unCumulativeAck message by the server
because the client will receive the new message and ack then will receive the
old message they abort.
in this case:
1. we have 5 message
2. cumulative ack 3 messages with the transaction
3. we abort this transaction
4. server redeliver message by the current consumer_epoch
5. the client will not filter the 4 or 5 messages, because in #10478 we
don't change the client consumer epoch
6. client cumulative ack 4 5 with transaction and commit will lose the 1 2
3 messages and the consume message, not in order.
### Modifications
don't redeliver any cumulative ack messages, it will do by user self
---
.../pendingack/impl/PendingAckHandleImpl.java | 6 +-
.../client/impl/TransactionEndToEndTest.java | 69 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 15 -----
.../client/impl/transaction/TransactionImpl.java | 25 +-------
4 files changed, 74 insertions(+), 41 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 1a159974700..c74fb3e9217 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.transaction.pendingack.impl;
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
import static
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
import static
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
-import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -528,9 +527,10 @@ public class PendingAckHandleImpl extends
PendingAckHandleState implements Pendi
if (cumulativeAckOfTransaction.getKey().equals(txnId))
{
cumulativeAckOfTransaction = null;
}
- //TODO: pendingAck handle next pr will fix
-
persistentSubscription.redeliverUnacknowledgedMessages(consumer,
DEFAULT_CONSUMER_EPOCH);
abortFuture.complete(null);
+
+ // in cumulative ack with transaction, don't depend on
server redeliver message,
+ // it will cause the messages to be out of order
}).exceptionally(e -> {
log.error("[{}] Transaction pending ack store abort
txnId : [{}] fail!",
topicName, txnId, e);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 3a40982fed4..b372f0b61c5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.channel.ChannelHandlerContext;
@@ -1066,6 +1067,74 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
}
}
+ @Test
+ public void testCumulativeAckRedeliverMessages() throws Exception {
+ String topic = NAMESPACE1 + "/testCumulativeAckRedeliverMessages";
+
+ int count = 5;
+ int transactionCumulativeAck = 3;
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("test")
+ .subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .create();
+
+ // send 5 messages
+ for (int i = 0; i < count; i++) {
+ producer.send((i + "").getBytes(UTF_8));
+ }
+
+ Transaction transaction = getTxn();
+ Transaction invalidTransaction = getTxn();
+
+ Message<byte[]> message = null;
+ for (int i = 0; i < transactionCumulativeAck; i++) {
+ message = consumer.receive();
+ }
+
+ // receive transaction in order
+ assertEquals(message.getValue(), (transactionCumulativeAck - 1 +
"").getBytes(UTF_8));
+
+ // ack the last message
+ consumer.acknowledgeCumulativeAsync(message.getMessageId(),
transaction).get();
+
+ // another ack will throw TransactionConflictException
+ try {
+ consumer.acknowledgeCumulativeAsync(message.getMessageId(),
invalidTransaction).get();
+ fail();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof
PulsarClientException.TransactionConflictException);
+ // abort transaction then redeliver messages
+ transaction.abort().get();
+ // consumer redeliver messages
+ consumer.redeliverUnacknowledgedMessages();
+ }
+
+ // receive the rest of the message
+ for (int i = 0; i < count; i++) {
+ message = consumer.receive();
+ }
+
+ Transaction commitTransaction = getTxn();
+
+ // receive the first message
+ assertEquals(message.getValue(), (count - 1 + "").getBytes(UTF_8));
+ // ack the end of the message
+ consumer.acknowledgeCumulativeAsync(message.getMessageId(),
commitTransaction).get();
+
+ commitTransaction.commit().get();
+
+ // then redeliver will not receive any message
+ message = consumer.receive(3, TimeUnit.SECONDS);
+ assertNull(message);
+ }
+
@Test
public void testSendTxnMessageTimeout() throws Exception {
String topic = NAMESPACE1 + "/testSendTxnMessageTimeout";
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index aeb1b9ed7db..4a60dad8bfc 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -626,14 +626,6 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
TransactionImpl
txn) {
CompletableFuture<Void> ackFuture;
if (txn != null && this instanceof ConsumerImpl) {
-
- // it is okay that we register acked topic after sending the
acknowledgements. because
- // the transactional ack will not be visiable for consumers until
the transaction is
- // committed
- if (ackType == AckType.Cumulative) {
- txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
- }
-
ackFuture = txn.registerAckedTopic(getTopic(), subscription)
.thenCompose(ignored -> doAcknowledge(messageIdList,
ackType, properties, txn));
// register the ackFuture as part of the transaction
@@ -649,13 +641,6 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
TransactionImpl
txn) {
CompletableFuture<Void> ackFuture;
if (txn != null && (this instanceof ConsumerImpl)) {
- // it is okay that we register acked topic after sending the
acknowledgements. because
- // the transactional ack will not be visiable for consumers until
the transaction is
- // committed
- if (ackType == AckType.Cumulative) {
- txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
- }
-
ackFuture = txn.registerAckedTopic(getTopic(), subscription)
.thenCompose(ignored -> doAcknowledge(messageId, ackType,
properties, txn));
// register the ackFuture as part of the transaction
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index aa7a18047de..55b20438693 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -37,7 +36,6 @@ import org.apache.pulsar.client.api.transaction.Transaction;
import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException;
import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.FutureUtil;
@@ -62,7 +60,6 @@ public class TransactionImpl implements Transaction ,
TimerTask {
private final Map<String, CompletableFuture<Void>> registerPartitionMap;
private final Map<Pair<String, String>, CompletableFuture<Void>>
registerSubscriptionMap;
private final TransactionCoordinatorClientImpl tcClient;
- private Map<ConsumerImpl<?>, Integer> cumulativeAckConsumers;
private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
private final ArrayList<CompletableFuture<Void>> ackFutureList;
@@ -122,9 +119,8 @@ public class TransactionImpl implements Transaction ,
TimerTask {
}
});
}
- } else {
- return completableFuture;
}
+ return completableFuture;
}
public synchronized void registerSendOp(CompletableFuture<MessageId>
sendFuture) {
@@ -147,22 +143,14 @@ public class TransactionImpl implements Transaction ,
TimerTask {
}
});
}
- } else {
- return completableFuture;
}
+ return completableFuture;
}
public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) {
ackFutureList.add(ackFuture);
}
- public synchronized void registerCumulativeAckConsumer(ConsumerImpl<?>
consumer) {
- if (this.cumulativeAckConsumers == null) {
- this.cumulativeAckConsumers = new HashMap<>();
- }
- cumulativeAckConsumers.put(consumer, 0);
- }
-
@Override
public CompletableFuture<Void> commit() {
timeout.cancel();
@@ -202,16 +190,7 @@ public class TransactionImpl implements Transaction ,
TimerTask {
if (e != null) {
log.error(e.getMessage());
}
- if (cumulativeAckConsumers != null) {
- cumulativeAckConsumers.forEach((consumer, integer) ->
- cumulativeAckConsumers
- .putIfAbsent(consumer,
consumer.clearIncomingMessagesAndGetMessageNumber()));
- }
tcClient.abortAsync(new TxnID(txnIdMostBits,
txnIdLeastBits)).whenComplete((vx, ex) -> {
- if (cumulativeAckConsumers != null) {
-
cumulativeAckConsumers.forEach(ConsumerImpl::increaseAvailablePermits);
- cumulativeAckConsumers.clear();
- }
if (ex != null) {
if (ex instanceof TransactionNotFoundException