This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b68fa32ea853048c0e0d163f5d8d6efc5f5ed539 Author: Jiwei Guo <[email protected]> AuthorDate: Tue Jun 21 17:13:00 2022 +0800 Fix `messageQueue` release message issue. (#16155) (cherry picked from commit 141c44022a27be2fc07eab9827cfdb168e448953) --- .../org/apache/pulsar/sql/presto/PulsarRecordCursor.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java index 1ea232203d3..558f6b47e9d 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java @@ -726,19 +726,17 @@ public class PulsarRecordCursor implements RecordCursor { public void close() { log.info("Closing cursor record"); - if (currentMessage != null) { - currentMessage.release(); - } - - if (messageQueue != null) { - messageQueue.drain(RawMessage::release); - } - if (deserializeEntries != null) { deserializeEntries.close().whenComplete((r, t) -> { if (entryQueue != null) { entryQueue.drain(Entry::release); } + if (messageQueue != null) { + messageQueue.drain(RawMessage::release); + } + if (currentMessage != null) { + currentMessage.release(); + } }); }
