This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a598291e7aa [fix][broker] Release Netty buffer in finally block in
ServerCnx#handleLastMessageIdFromCompactedLedger (#19395)
a598291e7aa is described below
commit a598291e7aa0287d07df50ff1e26b02ea9e286df
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Feb 2 14:11:56 2023 +0200
[fix][broker] Release Netty buffer in finally block in
ServerCnx#handleLastMessageIdFromCompactedLedger (#19395)
---
.../apache/pulsar/broker/service/ServerCnx.java | 35 ++++++++++++----------
1 file changed, 19 insertions(+), 16 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 71db6e63883..56f3f07fd8c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2029,24 +2029,27 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
int partitionIndex, PositionImpl markDeletePosition) {
persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry
-> {
if (entry != null) {
- // in this case, all the data has been compacted, so return
the last position
- // in the compacted ledger to the client
- ByteBuf payload = entry.getDataBuffer();
- MessageMetadata metadata =
Commands.parseMessageMetadata(payload);
- int largestBatchIndex;
try {
- largestBatchIndex =
calculateTheLastBatchIndexInBatch(metadata, payload);
- } catch (IOException ioEx){
- writeAndFlush(Commands.newError(requestId,
ServerError.MetadataError,
- "Failed to deserialize batched message from the
last entry of the compacted Ledger: "
- + ioEx.getMessage()));
- return;
+ // in this case, all the data has been compacted, so
return the last position
+ // in the compacted ledger to the client
+ ByteBuf payload = entry.getDataBuffer();
+ MessageMetadata metadata =
Commands.parseMessageMetadata(payload);
+ int largestBatchIndex;
+ try {
+ largestBatchIndex =
calculateTheLastBatchIndexInBatch(metadata, payload);
+ } catch (IOException ioEx) {
+ writeAndFlush(Commands.newError(requestId,
ServerError.MetadataError,
+ "Failed to deserialize batched message from
the last entry of the compacted Ledger: "
+ + ioEx.getMessage()));
+ return;
+ }
+
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
+ entry.getLedgerId(), entry.getEntryId(),
partitionIndex, largestBatchIndex,
+ markDeletePosition != null ?
markDeletePosition.getLedgerId() : -1,
+ markDeletePosition != null ?
markDeletePosition.getEntryId() : -1));
+ } finally {
+ entry.release();
}
- writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
- entry.getLedgerId(), entry.getEntryId(),
partitionIndex, largestBatchIndex,
- markDeletePosition != null ?
markDeletePosition.getLedgerId() : -1,
- markDeletePosition != null ?
markDeletePosition.getEntryId() : -1));
- entry.release();
} else {
// in this case, the ledgers been removed except the current
ledger
// and current ledger without any data