This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch SinkChannelNPE in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ca138649dd390c2eb1dd3e28c25f29c255fc501a Author: JackieTien97 <[email protected]> AuthorDate: Tue Apr 11 21:21:08 2023 +0800 Fix potential NPE in SinkChannel --- .../mpp/execution/exchange/sink/ShuffleSinkHandle.java | 4 ++-- .../db/mpp/execution/exchange/sink/SinkChannel.java | 18 +++++++++++++----- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java index 6f9b617e2e..a4d3f7c198 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java @@ -166,7 +166,7 @@ public class ShuffleSinkHandle implements ISinkHandle { @Override public synchronized void abort() { - if (aborted) { + if (aborted || closed) { return; } LOGGER.debug("[StartAbortShuffleSinkHandle]"); @@ -192,7 +192,7 @@ public class ShuffleSinkHandle implements ISinkHandle { @Override public synchronized void close() { - if (closed) { + if (closed || aborted) { return; } LOGGER.debug("[StartCloseShuffleSinkHandle]"); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java index b32028bee1..1b027ba2ab 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java @@ -211,11 +211,13 @@ public class SinkChannel implements ISinkChannel { @Override public synchronized void abort() { LOGGER.debug("[StartAbortSinkChannel]"); - if (aborted) { + if (aborted || closed) { return; } sequenceIdToTsBlock.clear(); - bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked); + if (blocked != null) { + bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked); + } if (bufferRetainedSizeInBytes > 0) { localMemoryManager .getQueryPool() @@ -234,11 +236,13 @@ public class SinkChannel implements ISinkChannel { @Override public synchronized void close() { LOGGER.debug("[StartCloseSinkChannel]"); - if (closed) { + if (closed || aborted) { return; } sequenceIdToTsBlock.clear(); - bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryComplete(blocked); + if (blocked != null) { + bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryComplete(blocked); + } if (bufferRetainedSizeInBytes > 0) { localMemoryManager .getQueryPool() @@ -363,7 +367,11 @@ public class SinkChannel implements ISinkChannel { // region ============ ISinkChannel related ============ - public void open() { + @Override + public synchronized void open() { + if (aborted || closed) { + return; + } // SinkChannel is opened when ShuffleSinkHandle choose it as the next channel this.blocked = localMemoryManager
