This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a598291e7aa [fix][broker] Release Netty buffer in finally block in 
ServerCnx#handleLastMessageIdFromCompactedLedger (#19395)
a598291e7aa is described below

commit a598291e7aa0287d07df50ff1e26b02ea9e286df
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Feb 2 14:11:56 2023 +0200

    [fix][broker] Release Netty buffer in finally block in 
ServerCnx#handleLastMessageIdFromCompactedLedger (#19395)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 35 ++++++++++++----------
 1 file changed, 19 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 71db6e63883..56f3f07fd8c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2029,24 +2029,27 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             int partitionIndex, PositionImpl markDeletePosition) {
         
persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry
 -> {
             if (entry != null) {
-                // in this case, all the data has been compacted, so return 
the last position
-                // in the compacted ledger to the client
-                ByteBuf payload = entry.getDataBuffer();
-                MessageMetadata metadata = 
Commands.parseMessageMetadata(payload);
-                int largestBatchIndex;
                 try {
-                    largestBatchIndex = 
calculateTheLastBatchIndexInBatch(metadata, payload);
-                } catch (IOException ioEx){
-                    writeAndFlush(Commands.newError(requestId, 
ServerError.MetadataError,
-                            "Failed to deserialize batched message from the 
last entry of the compacted Ledger: "
-                                    + ioEx.getMessage()));
-                    return;
+                    // in this case, all the data has been compacted, so 
return the last position
+                    // in the compacted ledger to the client
+                    ByteBuf payload = entry.getDataBuffer();
+                    MessageMetadata metadata = 
Commands.parseMessageMetadata(payload);
+                    int largestBatchIndex;
+                    try {
+                        largestBatchIndex = 
calculateTheLastBatchIndexInBatch(metadata, payload);
+                    } catch (IOException ioEx) {
+                        writeAndFlush(Commands.newError(requestId, 
ServerError.MetadataError,
+                                "Failed to deserialize batched message from 
the last entry of the compacted Ledger: "
+                                        + ioEx.getMessage()));
+                        return;
+                    }
+                    
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
+                            entry.getLedgerId(), entry.getEntryId(), 
partitionIndex, largestBatchIndex,
+                            markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
+                            markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
+                } finally {
+                    entry.release();
                 }
-                writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
-                        entry.getLedgerId(), entry.getEntryId(), 
partitionIndex, largestBatchIndex,
-                        markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
-                        markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
-                entry.release();
             } else {
                 // in this case, the ledgers been removed except the current 
ledger
                 // and current ledger without any data

Reply via email to