codelipenghui commented on code in PR #21268:
URL: https://github.com/apache/pulsar/pull/21268#discussion_r1383321688
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2800,12 +2806,34 @@ private CompletableFuture<Void>
doTransactionAcknowledgeForResponse(MessageId me
} else {
bitSetRecyclable.clear(messageIdAdv.getBatchIndex());
}
- cmd = Commands.newAck(consumerId, ledgerId, entryId,
bitSetRecyclable, ackType, validationError, properties,
- txnID.getLeastSigBits(), txnID.getMostSigBits(),
requestId, messageIdAdv.getBatchSize());
+ cmdList.add(Commands.newAck(consumerId, ledgerId, entryId,
bitSetRecyclable, ackType, validationError, properties,
+ txnID.getLeastSigBits(), txnID.getMostSigBits(),
requestId, messageIdAdv.getBatchSize()));
Review Comment:
Oh, sorry. I misunderstand this part.
My suggestion is to use the SingtonList and the ArrayList instead of using
the LinkedList.
It will make the code easier to understand.
```diff
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 0c17626484..e848a94402 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2796,7 +2796,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
final MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
final long ledgerId = messageIdAdv.getLedgerId();
final long entryId = messageIdAdv.getEntryId();
- final List<ByteBuf> cmdList = new LinkedList<>();
+ final List<ByteBuf> cmdList;
if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
bitSetRecyclable.set(0, messageIdAdv.getBatchSize());
@@ -2806,15 +2806,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
} else {
bitSetRecyclable.clear(messageIdAdv.getBatchIndex());
}
- cmdList.add(Commands.newAck(consumerId, ledgerId, entryId,
bitSetRecyclable, ackType, validationError, properties,
- txnID.getLeastSigBits(), txnID.getMostSigBits(),
requestId, messageIdAdv.getBatchSize()));
+ cmdList = Collections.singletonList(Commands.newAck(consumerId,
ledgerId, entryId, bitSetRecyclable,
+ ackType, validationError, properties,
txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId,
+ messageIdAdv.getBatchSize()));
bitSetRecyclable.recycle();
} else {
MessageIdImpl[] chunkMsgIds =
this.unAckedChunkedMessageIdSequenceMap.remove(messageIdAdv);
// cumulative ack chunk by the last messageId
if (chunkMsgIds == null || ackType == AckType.Cumulative) {
- cmdList.add(Commands.newAck(consumerId, ledgerId, entryId,
null, ackType, validationError,
- properties, txnID.getLeastSigBits(),
txnID.getMostSigBits(), requestId));
+ cmdList =
Collections.singletonList(Commands.newAck(consumerId, ledgerId, entryId, null,
+ ackType, validationError, properties,
txnID.getLeastSigBits(), txnID.getMostSigBits(),
+ requestId));
} else {
if (Commands.peerSupportsMultiMessageAcknowledgment(
getClientCnx().getRemoteEndpointProtocolVersion()))
{
@@ -2825,8 +2827,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
}
}
- cmdList.add(newMultiTransactionMessageAck(consumerId,
txnID, entriesToAck, requestId));
+ cmdList =
Collections.singletonList(newMultiTransactionMessageAck(consumerId, txnID,
entriesToAck,
+ requestId));
} else {
+ cmdList = new ArrayList<>();
for (MessageIdImpl cMsgId : chunkMsgIds) {
cmdList.add(Commands.newAck(consumerId,
cMsgId.ledgerId, cMsgId.entryId, null, ackType,
validationError, properties,
@@ -2854,7 +2858,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID
txnID,
- List<Triple<Long, Long,
ConcurrentBitSetRecyclable>> entries, long requestID) {
+ List<Triple<Long, Long, ConcurrentBitSetRecyclable>>
entries, long requestID) {
BaseCommand cmd = newMultiMessageAckCommon(entries);
cmd.getAck()
.setConsumerId(consumerId)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]