This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch SinkChannelNPE
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ca138649dd390c2eb1dd3e28c25f29c255fc501a
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Apr 11 21:21:08 2023 +0800

    Fix potential NPE in SinkChannel
---
 .../mpp/execution/exchange/sink/ShuffleSinkHandle.java |  4 ++--
 .../db/mpp/execution/exchange/sink/SinkChannel.java    | 18 +++++++++++++-----
 2 files changed, 15 insertions(+), 7 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
index 6f9b617e2e..a4d3f7c198 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -166,7 +166,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
 
   @Override
   public synchronized void abort() {
-    if (aborted) {
+    if (aborted || closed) {
       return;
     }
     LOGGER.debug("[StartAbortShuffleSinkHandle]");
@@ -192,7 +192,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
 
   @Override
   public synchronized void close() {
-    if (closed) {
+    if (closed || aborted) {
       return;
     }
     LOGGER.debug("[StartCloseShuffleSinkHandle]");
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index b32028bee1..1b027ba2ab 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -211,11 +211,13 @@ public class SinkChannel implements ISinkChannel {
   @Override
   public synchronized void abort() {
     LOGGER.debug("[StartAbortSinkChannel]");
-    if (aborted) {
+    if (aborted || closed) {
       return;
     }
     sequenceIdToTsBlock.clear();
-    bufferRetainedSizeInBytes -= 
localMemoryManager.getQueryPool().tryCancel(blocked);
+    if (blocked != null) {
+      bufferRetainedSizeInBytes -= 
localMemoryManager.getQueryPool().tryCancel(blocked);
+    }
     if (bufferRetainedSizeInBytes > 0) {
       localMemoryManager
           .getQueryPool()
@@ -234,11 +236,13 @@ public class SinkChannel implements ISinkChannel {
   @Override
   public synchronized void close() {
     LOGGER.debug("[StartCloseSinkChannel]");
-    if (closed) {
+    if (closed || aborted) {
       return;
     }
     sequenceIdToTsBlock.clear();
-    bufferRetainedSizeInBytes -= 
localMemoryManager.getQueryPool().tryComplete(blocked);
+    if (blocked != null) {
+      bufferRetainedSizeInBytes -= 
localMemoryManager.getQueryPool().tryComplete(blocked);
+    }
     if (bufferRetainedSizeInBytes > 0) {
       localMemoryManager
           .getQueryPool()
@@ -363,7 +367,11 @@ public class SinkChannel implements ISinkChannel {
 
   // region ============ ISinkChannel related ============
 
-  public void open() {
+  @Override
+  public synchronized void open() {
+    if (aborted || closed) {
+      return;
+    }
     // SinkChannel is opened when ShuffleSinkHandle choose it as the next 
channel
     this.blocked =
         localMemoryManager

Reply via email to