gaoran10 opened a new issue #7981:
URL: https://github.com/apache/pulsar/issues/7981
**Describe the solution you'd like**
Currently, it seems that the pending-ack state only supports the single
messages, if we want to make the batch messages support this state, we need to
make some changes.
When the consumer makes an acknowledgment with a transaction, the Pulsar
Broker will record the ack position in a map, but this is not enough for the
batch messages.
**Some code**
```
// Map to keep track of message ack by each txn.
private ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashSet<Position>>
pendingAckMessagesMap;
// Messages acked by ongoing transaction, pending transaction commit to
materialize the acks. For faster look up.
// Using hashset as a message should only be acked once by one transaction.
private ConcurrentOpenHashSet<Position> pendingAckMessages;
...
// If try to ack message already acked by some ongoing transaction(can be
itself), throw exception.
// Acking single message within range of cumulative ack(if exist) is
considered valid operation.
if (this.pendingAckMessages.contains(position)) {
String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" +
txnId +
" try to ack message:" + position + " in pending ack
status.";
log.error(errorMsg);
throw new TransactionConflictException(errorMsg);
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]