This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 556d3d16c56 Fix `messageQueue` release message issue. (#16155)
556d3d16c56 is described below
commit 556d3d16c5651b626789f8b97a174ae5085dddb1
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Jun 21 17:13:00 2022 +0800
Fix `messageQueue` release message issue. (#16155)
(cherry picked from commit 141c44022a27be2fc07eab9827cfdb168e448953)
---
.../org/apache/pulsar/sql/presto/PulsarRecordCursor.java | 14 ++++++--------
1 file changed, 6 insertions(+), 8 deletions(-)
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 195f40fd326..e70f574cf29 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -727,19 +727,17 @@ public class PulsarRecordCursor implements RecordCursor {
public void close() {
log.info("Closing cursor record");
- if (currentMessage != null) {
- currentMessage.release();
- }
-
- if (messageQueue != null) {
- messageQueue.drain(RawMessage::release);
- }
-
if (deserializeEntries != null) {
deserializeEntries.close().whenComplete((r, t) -> {
if (entryQueue != null) {
entryQueue.drain(Entry::release);
}
+ if (messageQueue != null) {
+ messageQueue.drain(RawMessage::release);
+ }
+ if (currentMessage != null) {
+ currentMessage.release();
+ }
});
}