codelipenghui commented on code in PR #21419:
URL: https://github.com/apache/pulsar/pull/21419#discussion_r1368499518
##########
pip/pip-311.md:
##########
@@ -0,0 +1,73 @@
+# Title: Add a request ID parameter to the `newMultiTransactionMessageAck`
method signature
+
+# Background knowledge
+**newMultiTransactionMessageAck**: A public API in `Commands.java` that is
used to assemble a `CommandAck` to individually ack a list of message IDs with
a transaction ID.
+
+
+# Motivation
+In Pulsar, all operations are encapsulated as individual commands.
+For instance, the action of sending a message by ClientCnx is a `Send`
command, and the broker has a corresponding `handleSend` method to handle the
request of sending messages.
+Subsequently, the broker sends a successful command, and ClientCnx will have a
corresponding 'HandleSuccess' method to process the returned successful command.
+ClientCnx locally caches each request's ID and future. Upon receiving a
response from the broker, it operates on the corresponding future based on the
request ID.
+For requests that do not receive a response within a given timeout, their
futures will be completed with a Timeout exception and removed from the local
cache map.
+This requires that all command requests waiting for a response should include
the request ID, so that the response returned by the broker can carry the
corresponding request ID.
+This way, ClientCnx can clearly identify which request has been processed and
completed.
+
+But the public API `newMultiTransactionMessageAck` in the `Commands.java`
creates the ack commands without request ID and hopes to wait for the response.
+This causes all the requests created by `newMultiTransactionMessageAck` to be
timed out.
Review Comment:
From the source code, I don't see any places that used
`newMultiTransactionMessageAck`. What is the actual problem that the proposal
want to fix?
And for the transaction ack, we are using the following method to create the
ack command, right?
```java
public static ByteBuf newAck(long consumerId, List<MessageIdData>
messageIds, AckType ackType,
ValidationError validationError,
Map<String, Long> properties, long txnIdLeastBits,
long txnIdMostBits, long requestId) {
BaseCommand cmd = localCmd(Type.ACK);
CommandAck ack = cmd.setAck()
.setConsumerId(consumerId)
.setAckType(ackType);
ack.addAllMessageIds(messageIds);
return newAck(validationError, properties, txnIdLeastBits,
txnIdMostBits, requestId, ack, cmd);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]