This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 926834ef8b2 [improve][txn] Support ack message list for transaction
(#15729)
926834ef8b2 is described below
commit 926834ef8b2b57d3964aa7e9773e6245bcee861c
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Jul 5 12:42:23 2022 +0800
[improve][txn] Support ack message list for transaction (#15729)
* [improve][txn] Support ack message list for transaction
### Motivation
Now, there is
```java
doAcknowledgeWithTxn(List<MessageId> messageIdList, AckType ackType,
Map<String,
Long> properties,
TransactionImpl
txn)
```
But not interface
```java
acknowledgeAsync(List<MessageId> messageIdList, Transaction txn)
```
---
.../pulsar/broker/transaction/TransactionTest.java | 144 +++++++++++++++++++++
.../org/apache/pulsar/client/api/Consumer.java | 37 ++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 42 +++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 80 +++++++++++-
.../apache/pulsar/common/protocol/Commands.java | 20 ++-
5 files changed, 314 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index e95a811528c..78d499cbb62 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -38,6 +38,7 @@ import io.netty.buffer.Unpooled;
import io.netty.util.Timeout;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -106,7 +107,9 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessagesImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
@@ -1020,6 +1023,147 @@ public class TransactionTest extends
TransactionTestBase {
transaction.commit().get();
}
+ @Test(timeOut = 30000)
+ public void testTransactionAckMessageList() throws Exception {
+ String topic = "persistent://" + NAMESPACE1 +"/test";
+ String subName = "testSub";
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .sendTimeout(5, TimeUnit.SECONDS)
+ .create();
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscribe();
+
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().send();
+ }
+ //verify using aborted transaction to ack message list
+ List<MessageId> messages = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ Message<byte[]> message = consumer.receive();
+ messages.add(message.getMessageId());
+ }
+ Transaction transaction = pulsarClient
+ .newTransaction()
+ .withTransactionTimeout(5, TimeUnit.MINUTES)
+ .build()
+ .get();
+
+ consumer.acknowledgeAsync(messages, transaction);
+ transaction.abort().get();
+ consumer.close();
+ consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscribe();
+ for (int i = 0; i < 4; i++) {
+ Message<byte[]> message = consumer.receive();
+ assertTrue(messages.contains(message.getMessageId()));
+ }
+
+ //verify using committed transaction to ack message list
+ transaction = pulsarClient
+ .newTransaction()
+ .withTransactionTimeout(5, TimeUnit.MINUTES)
+ .build()
+ .get();
+ consumer.acknowledgeAsync(messages, transaction);
+ transaction.commit().get();
+
+ consumer.close();
+ consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscribe();
+ Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(message);
+ Assert.assertFalse(messages.contains(message.getMessageId()));
+ message = consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNull(message);
+ consumer.close();
+ }
+
+
+ @Test(timeOut = 30000)
+ public void testTransactionAckMessages() throws Exception {
+ String topic = "persistent://" + NAMESPACE1
+"/testTransactionAckMessages";
+ String subName = "testSub";
+ admin.topics().createPartitionedTopic(topic, 2);
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .sendTimeout(5, TimeUnit.SECONDS)
+ .create();
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscribe();
+
+ for (int i = 0; i < 4; i++) {
+ producer.newMessage().send();
+ }
+ Method method =
ConsumerBase.class.getDeclaredMethod("getNewMessagesImpl");
+ method.setAccessible(true);
+
+ Field field = MessagesImpl.class.getDeclaredField("messageList");
+ field.setAccessible(true);
+
+ MessagesImpl<byte[]> messages = (MessagesImpl<byte[]>)
method.invoke(consumer);
+
+ List<Message<byte[]>> messageList = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ Message<byte[]> message = consumer.receive();
+ messageList.add(message);
+ }
+ field.set(messages, messageList);
+ Transaction transaction = pulsarClient
+ .newTransaction()
+ .withTransactionTimeout(5, TimeUnit.MINUTES)
+ .build()
+ .get();
+
+ consumer.acknowledgeAsync(messages, transaction);
+ transaction.abort().get();
+ consumer.close();
+ consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscribe();
+ List<MessageId> messageIds = new ArrayList<>();
+ for (Message message : messageList) {
+ messageIds.add(message.getMessageId());
+ }
+ for (int i = 0; i < 4; i++) {
+ Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+ assertTrue(messageIds.contains(message.getMessageId()));
+ }
+
+ //verify using committed transaction to ack message list
+ transaction = pulsarClient
+ .newTransaction()
+ .withTransactionTimeout(5, TimeUnit.MINUTES)
+ .build()
+ .get();
+ consumer.acknowledgeAsync(messages, transaction);
+ transaction.commit().get();
+
+ consumer.close();
+ consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscribe();
+ Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNull(message);
+ consumer.close();
+ }
+
@Test
public void testGetConnectExceptionForAckMsgWhenCnxIsNull() throws
Exception {
String topic = NAMESPACE1 +
"/testGetConnectExceptionForAckMsgWhenCnxIsNull";
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index b7a9dd9ef48..3017cc2b4a0 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -491,6 +491,26 @@ public interface Consumer<T> extends Closeable {
*/
CompletableFuture<Void> acknowledgeAsync(Messages<?> messages);
+
+ /**
+ * Asynchronously acknowledge the consumption of {@link Messages}, it will
store in pending ack.
+ * After the transaction commit, the message will actually ack.
+ * After the transaction abort, the message will be redelivered.
+ * @param messages
+ * The {@link Messages} to be acknowledged
+ * @param txn {@link Transaction} The transaction to ack messages.
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the consumer was already closed
+ * @throws
org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
+ * if the ack with messageId is less than the messageId in
pending ack state or ack with transaction is
+ * different from the transaction in pending ack.
+ * @throws
org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
+ * broker don't support transaction
+ * @return {@link CompletableFuture} the future of the ack result
+ * */
+ CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction
txn);
+
+
/**
* Asynchronously acknowledge the consumption of a list of message.
* @param messageIdList
@@ -498,6 +518,23 @@ public interface Consumer<T> extends Closeable {
*/
CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList);
+
+ /**
+ * Acknowledge the consumption of a list of message, it will store in
pending ack.
+ * After the transaction commit, the message will actually ack.
+ * After the transaction abort, the message will be redelivered.
+ * @param messageIdList A list of message Id.
+ * @param txn {@link Transaction} The transaction to ack messages.
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the consumer was already closed
+ * @throws
org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
+ * if the ack with messageId is less than the messageId in
pending ack state or ack with transaction is
+ * different from the transaction in pending ack.
+ * @throws
org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
+ * broker don't support transaction
+ * @return {@link CompletableFuture} the future of the ack result */
+ CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList,
Transaction txn);
+
/**
* Asynchronously reconsumeLater the consumption of a single message.
*
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 0da17def097..aeb1b9ed7db 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -485,6 +485,11 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
@Override
public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages) {
+ return acknowledgeAsync(messages, null);
+ }
+
+ @Override
+ public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages,
Transaction txn) {
List<MessageId> messageIds = new ArrayList<>(messages.size());
for (Message<?> message: messages) {
try {
@@ -494,7 +499,11 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
}
messageIds.add(message.getMessageId());
}
- return acknowledgeAsync(messageIds);
+ if (txn != null) {
+ return acknowledgeAsync(messageIds, txn);
+ } else {
+ return acknowledgeAsync(messageIds);
+ }
}
@Override
@@ -502,6 +511,11 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
return doAcknowledgeWithTxn(messageIdList, AckType.Individual,
Collections.emptyMap(), null);
}
+ @Override
+ public CompletableFuture<Void> acknowledgeAsync(List<MessageId>
messageIdList, Transaction txn) {
+ return doAcknowledgeWithTxn(messageIdList, AckType.Individual,
Collections.emptyMap(), (TransactionImpl) txn);
+ }
+
@Override
public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message,
long delayTime, TimeUnit unit) {
return reconsumeLaterAsync(message, null, delayTime, unit);
@@ -611,12 +625,21 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
Map<String, Long>
properties,
TransactionImpl
txn) {
CompletableFuture<Void> ackFuture;
- if (txn != null) {
+ if (txn != null && this instanceof ConsumerImpl) {
+
+ // it is okay that we register acked topic after sending the
acknowledgements. because
+ // the transactional ack will not be visiable for consumers until
the transaction is
+ // committed
+ if (ackType == AckType.Cumulative) {
+ txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
+ }
+
ackFuture = txn.registerAckedTopic(getTopic(), subscription)
.thenCompose(ignored -> doAcknowledge(messageIdList,
ackType, properties, txn));
+ // register the ackFuture as part of the transaction
txn.registerAckOp(ackFuture);
} else {
- ackFuture = doAcknowledge(messageIdList, ackType, properties,
null);
+ ackFuture = doAcknowledge(messageIdList, ackType, properties, txn);
}
return ackFuture;
}
@@ -787,12 +810,25 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
}
}
+ protected void onAcknowledge(List<MessageId> messageIds, Throwable
exception) {
+ if (interceptors != null) {
+ messageIds.forEach(messageId -> interceptors.onAcknowledge(this,
messageId, exception));
+ }
+ }
+
protected void onAcknowledgeCumulative(MessageId messageId, Throwable
exception) {
if (interceptors != null) {
interceptors.onAcknowledgeCumulative(this, messageId, exception);
}
}
+ protected void onAcknowledgeCumulative(List<MessageId> messageIds,
Throwable exception) {
+ if (interceptors != null) {
+ messageIds.forEach(messageId ->
interceptors.onAcknowledgeCumulative(this, messageId, exception));
+ }
+ }
+
+
protected void onNegativeAcksSend(Set<MessageId> messageIds) {
if (interceptors != null) {
interceptors.onNegativeAcksSend(this, messageIds);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 5228810f6cf..c9cf4a12b75 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -36,6 +36,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -587,11 +588,28 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
@Override
- protected CompletableFuture<Void> doAcknowledge(List<MessageId>
messageIdList,
- AckType ackType,
- Map<String, Long>
properties,
- TransactionImpl txn) {
- return
this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList,
ackType, properties);
+ protected CompletableFuture<Void> doAcknowledge(List<MessageId>
messageIdList, AckType ackType,
+ Map<String, Long>
properties, TransactionImpl txn) {
+
+ for (MessageId messageId : messageIdList) {
+ checkArgument(messageId instanceof MessageIdImpl);
+ }
+ if (getState() != State.Ready && getState() != State.Connecting) {
+ stats.incrementNumAcksFailed();
+ PulsarClientException exception = new
PulsarClientException("Consumer not ready. State: " + getState());
+ if (AckType.Individual.equals(ackType)) {
+ onAcknowledge(messageIdList, exception);
+ } else if (AckType.Cumulative.equals(ackType)) {
+ onAcknowledgeCumulative(messageIdList, exception);
+ }
+ return FutureUtil.failedFuture(exception);
+ }
+ if (txn != null) {
+ return doTransactionAcknowledgeForResponse(messageIdList, ackType,
null,
+ properties, new TxnID(txn.getTxnIdMostBits(),
txn.getTxnIdLeastBits()));
+ } else {
+ return
this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList,
ackType, properties);
+ }
}
@@ -2704,6 +2722,58 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
}
+ private CompletableFuture<Void>
doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
+
ValidationError validationError,
+
Map<String, Long> properties, TxnID txnID) {
+ BitSetRecyclable bitSetRecyclable = null;
+ long ledgerId;
+ long entryId;
+ ByteBuf cmd;
+ long requestId = client.newRequestId();
+ List<MessageIdData> messageIdDataList = new LinkedList<>();
+ for (MessageId messageId : messageIds) {
+ if (messageId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
messageId;
+ bitSetRecyclable = BitSetRecyclable.create();
+ if (ackType == AckType.Cumulative) {
+ batchMessageId.ackCumulative();
+ bitSetRecyclable.set(0, batchMessageId.getBatchSize());
+ bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() +
1);
+ } else {
+ bitSetRecyclable.set(0, batchMessageId.getBatchSize());
+ bitSetRecyclable.clear(batchMessageId.getBatchIndex());
+ }
+ MessageIdData messageIdData = new MessageIdData();
+ messageIdData.setLedgerId(batchMessageId.getLedgerId());
+ messageIdData.setEntryId(batchMessageId.getEntryId());
+ messageIdData.setBatchSize(batchMessageId.getBatchSize());
+ long[] as = bitSetRecyclable.toLongArray();
+ for (int i = 0; i < as.length; i++) {
+ messageIdData.addAckSet(as[i]);
+ }
+ bitSetRecyclable.recycle();
+ messageIdDataList.add(messageIdData);
+ } else {
+ MessageIdImpl singleMessage = (MessageIdImpl) messageId;
+ ledgerId = singleMessage.getLedgerId();
+ entryId = singleMessage.getEntryId();
+ MessageIdData messageIdData = new MessageIdData();
+ messageIdData.setLedgerId(ledgerId);
+ messageIdData.setEntryId(entryId);
+ messageIdDataList.add(messageIdData);
+ }
+
+ if (ackType == AckType.Cumulative) {
+ unAckedMessageTracker.removeMessagesTill(messageId);
+ } else {
+ unAckedMessageTracker.remove(messageId);
+ }
+ }
+ cmd = Commands.newAck(consumerId, messageIdDataList, ackType,
validationError, properties,
+ txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId);
+ return cnx().newAckForReceipt(cmd, requestId);
+ }
+
public Map<MessageIdImpl, List<MessageImpl<T>>>
getPossibleSendToDeadLetterTopicMessages() {
return possibleSendToDeadLetterTopicMessages;
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index d8151579353..62243063741 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -990,9 +990,26 @@ public class Commands {
}
if (batchSize >= 0) {
- messageIdData.setBatchSize(batchSize);
+ messageIdData.setBatchSize(batchSize);
}
+ return newAck(validationError, properties, txnIdLeastBits,
txnIdMostBits, requestId, ack, cmd);
+ }
+
+ public static ByteBuf newAck(long consumerId, List<MessageIdData>
messageIds, AckType ackType,
+ ValidationError validationError, Map<String,
Long> properties, long txnIdLeastBits,
+ long txnIdMostBits, long requestId) {
+ BaseCommand cmd = localCmd(Type.ACK);
+ CommandAck ack = cmd.setAck()
+ .setConsumerId(consumerId)
+ .setAckType(ackType);
+ ack.addAllMessageIds(messageIds);
+
+ return newAck(validationError, properties, txnIdLeastBits,
txnIdMostBits, requestId, ack, cmd);
+ }
+
+ private static ByteBuf newAck(ValidationError validationError, Map<String,
Long> properties, long txnIdLeastBits,
+ long txnIdMostBits, long requestId,
CommandAck ack, BaseCommand cmd) {
if (validationError != null) {
ack.setValidationError(validationError);
}
@@ -1014,6 +1031,7 @@ public class Commands {
return serializeWithSize(cmd);
}
+
public static ByteBuf newAck(long consumerId, long ledgerId, long entryId,
BitSetRecyclable ackSet, AckType ackType,
ValidationError validationError, Map<String,
Long> properties, long txnIdLeastBits,
long txnIdMostBits, long requestId) {