This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch TimeoutError-1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 008eb7e0f6446b96c4c6d262d68e12ce307151cd
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Jul 10 17:26:29 2025 +0800

    Fix potential memory leak of Memory Pool
    
    (cherry picked from commit 4be53debee7a6455ac3e8267ef31187ef0d5b092)
---
 .../execution/exchange/SharedTsBlockQueue.java     |  2 +-
 .../queryengine/execution/exchange/sink/ISink.java |  4 +--
 .../execution/exchange/sink/LocalSinkChannel.java  | 14 ++++----
 .../execution/exchange/sink/ShuffleSinkHandle.java | 40 +++++++++++++--------
 .../execution/exchange/sink/SinkChannel.java       | 10 +++---
 .../fragment/FragmentInstanceExecution.java        | 42 ++++++++++++++++++----
 .../apache/iotdb/db/utils/ErrorHandlingUtils.java  |  3 +-
 .../queryengine/execution/exchange/StubSink.java   |  6 ++--
 8 files changed, 84 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
index dcd062052b7..ec854da306c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
@@ -74,7 +74,7 @@ public class SharedTsBlockQueue {
 
   private ListenableFuture<Void> blockedOnMemory;
 
-  private boolean closed = false;
+  private volatile boolean closed = false;
   private boolean alreadyRegistered = false;
 
   private LocalSourceHandle sourceHandle;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ISink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ISink.java
index 37ec1ba6fef..5ee71abb1cf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ISink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ISink.java
@@ -71,7 +71,7 @@ public interface ISink extends Accountable {
    *
    * <p>Should only be called in abnormal case
    */
-  void abort();
+  boolean abort();
 
   /**
    * Close the ISink. If this is an ISinkHandle, we should close all its 
channels. If this is an
@@ -80,7 +80,7 @@ public interface ISink extends Accountable {
    *
    * <p>Should only be called in normal case.
    */
-  void close();
+  boolean close();
 
   /** Return true if this ISink has been closed. Used in {@link 
Driver#isFinishedInternal()}. */
   boolean isClosed();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java
index 06ca3cf7965..91858a26d1b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java
@@ -49,8 +49,8 @@ public class LocalSinkChannel implements ISinkChannel {
   @SuppressWarnings("squid:S3077")
   private volatile ListenableFuture<Void> blocked;
 
-  private boolean aborted = false;
-  private boolean closed = false;
+  private volatile boolean aborted = false;
+  private volatile boolean closed = false;
 
   private boolean invokedOnFinished = false;
 
@@ -181,14 +181,14 @@ public class LocalSinkChannel implements ISinkChannel {
   }
 
   @Override
-  public void abort() {
+  public boolean abort() {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("[StartAbortLocalSinkChannel]");
     }
     synchronized (queue) {
       synchronized (this) {
         if (aborted || closed) {
-          return;
+          return false;
         }
         aborted = true;
         Optional<Throwable> t = sinkListener.onAborted(this);
@@ -202,17 +202,18 @@ public class LocalSinkChannel implements ISinkChannel {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("[EndAbortLocalSinkChannel]");
     }
+    return true;
   }
 
   @Override
-  public void close() {
+  public boolean close() {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("[StartCloseLocalSinkChannel]");
     }
     synchronized (queue) {
       synchronized (this) {
         if (aborted || closed) {
-          return;
+          return false;
         }
         closed = true;
         queue.close();
@@ -225,6 +226,7 @@ public class LocalSinkChannel implements ISinkChannel {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("[EndCloseLocalSinkChannel]");
     }
+    return true;
   }
 
   public SharedTsBlockQueue getSharedTsBlockQueue() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
index 90b135dcc60..1b7f175d123 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
@@ -190,18 +190,19 @@ public class ShuffleSinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void abort() {
+  public boolean abort() {
     if (aborted || closed) {
-      return;
+      return false;
     }
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("[StartAbortShuffleSinkHandle]");
     }
     boolean meetError = false;
     Exception firstException = null;
+    boolean selfAborted = true;
     for (ISink channel : downStreamChannelList) {
       try {
-        channel.abort();
+        selfAborted = channel.abort();
       } catch (Exception e) {
         if (!meetError) {
           firstException = e;
@@ -212,10 +213,15 @@ public class ShuffleSinkHandle implements ISinkHandle {
     if (meetError) {
       LOGGER.warn("Error occurred when try to abort channel.", firstException);
     }
-    sinkListener.onAborted(this);
-    aborted = true;
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("[EndAbortShuffleSinkHandle]");
+    if (selfAborted) {
+      sinkListener.onAborted(this);
+      aborted = true;
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("[EndAbortShuffleSinkHandle]");
+      }
+      return true;
+    } else {
+      return false;
     }
   }
 
@@ -224,18 +230,19 @@ public class ShuffleSinkHandle implements ISinkHandle {
   // ShuffleSinkHandle while synchronized methods of ShuffleSinkHandle
   // Lock ShuffleSinkHandle and wait to lock LocalSinkChannel
   @Override
-  public void close() {
+  public boolean close() {
     if (closed || aborted) {
-      return;
+      return false;
     }
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("[StartCloseShuffleSinkHandle]");
     }
     boolean meetError = false;
     Exception firstException = null;
+    boolean selfClosed = true;
     for (ISink channel : downStreamChannelList) {
       try {
-        channel.close();
+        selfClosed = channel.close();
       } catch (Exception e) {
         if (!meetError) {
           firstException = e;
@@ -246,10 +253,15 @@ public class ShuffleSinkHandle implements ISinkHandle {
     if (meetError) {
       LOGGER.warn("Error occurred when try to close channel.", firstException);
     }
-    sinkListener.onFinish(this);
-    closed = true;
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("[EndCloseShuffleSinkHandle]");
+    if (selfClosed) {
+      sinkListener.onFinish(this);
+      closed = true;
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("[EndCloseShuffleSinkHandle]");
+      }
+      return true;
+    } else {
+      return false;
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
index 8915938eb03..ca6fdadc993 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
@@ -239,12 +239,12 @@ public class SinkChannel implements ISinkChannel {
   }
 
   @Override
-  public synchronized void abort() {
+  public synchronized boolean abort() {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("[StartAbortSinkChannel]");
     }
     if (aborted || closed) {
-      return;
+      return false;
     }
     sequenceIdToTsBlock.clear();
     if (blocked != null) {
@@ -265,15 +265,16 @@ public class SinkChannel implements ISinkChannel {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("[EndAbortSinkChannel]");
     }
+    return true;
   }
 
   @Override
-  public synchronized void close() {
+  public synchronized boolean close() {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("[StartCloseSinkChannel]");
     }
     if (closed || aborted) {
-      return;
+      return false;
     }
     sequenceIdToTsBlock.clear();
     if (blocked != null) {
@@ -294,6 +295,7 @@ public class SinkChannel implements ISinkChannel {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("[EndCloseSinkChannel]");
     }
+    return true;
   }
 
   private void invokeOnFinished() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index f5e96a980ce..9a825a055c4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -304,7 +304,16 @@ public class FragmentInstanceExecution {
             staticsRemoved = true;
             statisticsLock.writeLock().unlock();
 
-            clearShuffleSinkHandle(newState);
+            // must clear shuffle sink handle before driver close
+            // because in failed state, if we can driver.close firstly, we 
will finally call
+            // sink.setNoMoreTsBlocks() which may mislead upstream that 
downstream normally ends
+            try {
+              clearShuffleSinkHandle(newState);
+            } catch (Throwable t) {
+              LOGGER.error(
+                  "Errors occurred while attempting to release sink, 
potentially leading to resource leakage.",
+                  t);
+            }
 
             // close the driver after sink is aborted or closed because in 
driver.close() it
             // will try to call ISink.setNoMoreTsBlocks()
@@ -317,14 +326,33 @@ public class FragmentInstanceExecution {
             // release file handlers
             context.releaseResourceWhenAllDriversAreClosed();
 
-            // delete tmp file if exists
-            deleteTmpFile();
+            try {
+              // delete tmp file if exists
+              deleteTmpFile();
+            } catch (Throwable t) {
+              LOGGER.error(
+                  "Errors occurred while attempting to delete tmp files, 
potentially leading to resource leakage.",
+                  t);
+            }
 
-            // release memory
-            exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
-                instanceId.getQueryId().getId(), 
instanceId.getFragmentInstanceId(), true);
+            try {
+              // release memory
+              exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
+                  instanceId.getQueryId().getId(), 
instanceId.getFragmentInstanceId(), true);
+            } catch (Throwable t) {
+              LOGGER.error(
+                  "Errors occurred while attempting to deRegister FI from 
Memory Pool, potentially leading to resource leakage, status is {}.",
+                  newState,
+                  t);
+            }
 
-            context.releaseMemoryReservationManager();
+            try {
+              context.releaseMemoryReservationManager();
+            } catch (Throwable t) {
+              LOGGER.error(
+                  "Errors occurred while attempting to release memory, 
potentially leading to resource leakage.",
+                  t);
+            }
 
             if (newState.isFailed()) {
               scheduler.abortFragmentInstance(instanceId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 9fd7053ba92..785395af8fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -111,7 +111,8 @@ public class ErrorHandlingUtils {
             || status.getCode() == 
TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode()
             || status.getCode() == 
TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()
             || status.getCode() == 
TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()
-            || status.getCode() == 
TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode()) {
+            || status.getCode() == 
TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode()
+            || status.getCode() == TSStatusCode.QUERY_TIMEOUT.getStatusCode()) 
{
           LOGGER.info(message);
         } else {
           LOGGER.warn(message, e);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/StubSink.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/StubSink.java
index 7fce04723e3..6217e6a8e8b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/StubSink.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/StubSink.java
@@ -92,15 +92,17 @@ public class StubSink implements ISink {
   }
 
   @Override
-  public void abort() {
+  public boolean abort() {
     closed = true;
     tsBlocks.clear();
+    return true;
   }
 
   @Override
-  public void close() {
+  public boolean close() {
     closed = true;
     tsBlocks.clear();
+    return true;
   }
 
   @Override

Reply via email to