This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new dddedf06260 Fix potential NPE when concurrently aborting a Query
dddedf06260 is described below

commit dddedf0626052132d0e74054c129ec16e3a3bb6a
Author: Liao Lanyu <[email protected]>
AuthorDate: Wed Apr 10 14:44:34 2024 +0800

    Fix potential NPE when concurrently aborting a Query
---
 .../execution/exchange/sink/ShuffleSinkHandle.java         |  4 ++--
 .../execution/exchange/source/LocalSourceHandle.java       |  9 ---------
 .../iotdb/db/queryengine/execution/memory/MemoryPool.java  | 14 +++++++-------
 3 files changed, 9 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
index 7c31ab67e04..b9bf277b68b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
@@ -188,7 +188,6 @@ public class ShuffleSinkHandle implements ISinkHandle {
     if (aborted || closed) {
       return;
     }
-    aborted = true;
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("[StartAbortShuffleSinkHandle]");
     }
@@ -208,6 +207,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
       LOGGER.warn("Error occurred when try to abort channel.", firstException);
     }
     sinkListener.onAborted(this);
+    aborted = true;
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("[EndAbortShuffleSinkHandle]");
     }
@@ -222,7 +222,6 @@ public class ShuffleSinkHandle implements ISinkHandle {
     if (closed || aborted) {
       return;
     }
-    closed = true;
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("[StartCloseShuffleSinkHandle]");
     }
@@ -242,6 +241,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
       LOGGER.warn("Error occurred when try to close channel.", firstException);
     }
     sinkListener.onFinish(this);
+    closed = true;
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("[EndCloseShuffleSinkHandle]");
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
index de37c64e460..7c6b56fd5ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
@@ -183,9 +183,6 @@ public class LocalSourceHandle implements ISourceHandle {
 
   @Override
   public void abort() {
-    if (aborted || closed) {
-      return;
-    }
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("[StartAbortLocalSourceHandle]");
@@ -208,9 +205,6 @@ public class LocalSourceHandle implements ISourceHandle {
 
   @Override
   public void abort(Throwable t) {
-    if (aborted || closed) {
-      return;
-    }
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("[StartAbortLocalSourceHandle]");
@@ -233,9 +227,6 @@ public class LocalSourceHandle implements ISourceHandle {
 
   @Override
   public void close() {
-    if (aborted || closed) {
-      return;
-    }
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("[StartCloseLocalSourceHandle]");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
index 3b8adc70d44..c43ba99c63a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
@@ -223,9 +223,9 @@ public class MemoryPool {
       String planNodeId,
       long bytesToReserve,
       long maxBytesCanReserve) {
-    Validate.notNull(queryId);
-    Validate.notNull(fragmentInstanceId);
-    Validate.notNull(planNodeId);
+    Validate.notNull(queryId, "queryId can not be null.");
+    Validate.notNull(fragmentInstanceId, "fragmentInstanceId can not be 
null.");
+    Validate.notNull(planNodeId, "planNodeId can not be null.");
     Validate.isTrue(
         bytesToReserve > 0L && bytesToReserve <= maxBytesPerFragmentInstance,
         "bytesToReserve should be in (0,maxBytesPerFI]. maxBytesPerFI: %d",
@@ -263,9 +263,9 @@ public class MemoryPool {
       String planNodeId,
       long bytesToReserve,
       long maxBytesCanReserve) {
-    Validate.notNull(queryId);
-    Validate.notNull(fragmentInstanceId);
-    Validate.notNull(planNodeId);
+    Validate.notNull(queryId, "queryId can not be null.");
+    Validate.notNull(fragmentInstanceId, "fragmentInstanceId can not be 
null.");
+    Validate.notNull(planNodeId, "planNodeId can not be null.");
     Validate.isTrue(
         bytesToReserve > 0L && bytesToReserve <= maxBytesPerFragmentInstance,
         "bytesToReserve should be in (0,maxBytesPerFI]. maxBytesPerFI: %d",
@@ -288,10 +288,10 @@ public class MemoryPool {
    */
   @SuppressWarnings("squid:S2445")
   public synchronized long tryCancel(ListenableFuture<Void> future) {
+    Validate.notNull(future, "The future to be cancelled can not be null.");
     // add synchronized on the future to avoid that the future is concurrently 
completed by
     // MemoryPool.free() which may lead to memory leak.
     synchronized (future) {
-      Validate.notNull(future, "The future to be cancelled can not be null.");
       // If the future is not a MemoryReservationFuture, it must have been 
completed.
       if (future.isDone()) {
         return 0L;

Reply via email to