This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch TimeoutError-1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 008eb7e0f6446b96c4c6d262d68e12ce307151cd Author: Jackie Tien <[email protected]> AuthorDate: Thu Jul 10 17:26:29 2025 +0800 Fix potential memory leak of Memory Pool (cherry picked from commit 4be53debee7a6455ac3e8267ef31187ef0d5b092) --- .../execution/exchange/SharedTsBlockQueue.java | 2 +- .../queryengine/execution/exchange/sink/ISink.java | 4 +-- .../execution/exchange/sink/LocalSinkChannel.java | 14 ++++---- .../execution/exchange/sink/ShuffleSinkHandle.java | 40 +++++++++++++-------- .../execution/exchange/sink/SinkChannel.java | 10 +++--- .../fragment/FragmentInstanceExecution.java | 42 ++++++++++++++++++---- .../apache/iotdb/db/utils/ErrorHandlingUtils.java | 3 +- .../queryengine/execution/exchange/StubSink.java | 6 ++-- 8 files changed, 84 insertions(+), 37 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java index dcd062052b7..ec854da306c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java @@ -74,7 +74,7 @@ public class SharedTsBlockQueue { private ListenableFuture<Void> blockedOnMemory; - private boolean closed = false; + private volatile boolean closed = false; private boolean alreadyRegistered = false; private LocalSourceHandle sourceHandle; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ISink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ISink.java index 37ec1ba6fef..5ee71abb1cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ISink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ISink.java @@ -71,7 +71,7 @@ public interface ISink extends Accountable { * * <p>Should only be called in abnormal case */ - void abort(); + boolean abort(); /** * Close the ISink. If this is an ISinkHandle, we should close all its channels. If this is an @@ -80,7 +80,7 @@ public interface ISink extends Accountable { * * <p>Should only be called in normal case. */ - void close(); + boolean close(); /** Return true if this ISink has been closed. Used in {@link Driver#isFinishedInternal()}. */ boolean isClosed(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java index 06ca3cf7965..91858a26d1b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java @@ -49,8 +49,8 @@ public class LocalSinkChannel implements ISinkChannel { @SuppressWarnings("squid:S3077") private volatile ListenableFuture<Void> blocked; - private boolean aborted = false; - private boolean closed = false; + private volatile boolean aborted = false; + private volatile boolean closed = false; private boolean invokedOnFinished = false; @@ -181,14 +181,14 @@ public class LocalSinkChannel implements ISinkChannel { } @Override - public void abort() { + public boolean abort() { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartAbortLocalSinkChannel]"); } synchronized (queue) { synchronized (this) { if (aborted || closed) { - return; + return false; } aborted = true; Optional<Throwable> t = sinkListener.onAborted(this); @@ -202,17 +202,18 @@ public class LocalSinkChannel implements ISinkChannel { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[EndAbortLocalSinkChannel]"); } + return true; } @Override - public void close() { + public boolean close() { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartCloseLocalSinkChannel]"); } synchronized (queue) { synchronized (this) { if (aborted || closed) { - return; + return false; } closed = true; queue.close(); @@ -225,6 +226,7 @@ public class LocalSinkChannel implements ISinkChannel { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[EndCloseLocalSinkChannel]"); } + return true; } public SharedTsBlockQueue getSharedTsBlockQueue() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java index 90b135dcc60..1b7f175d123 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java @@ -190,18 +190,19 @@ public class ShuffleSinkHandle implements ISinkHandle { } @Override - public void abort() { + public boolean abort() { if (aborted || closed) { - return; + return false; } if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartAbortShuffleSinkHandle]"); } boolean meetError = false; Exception firstException = null; + boolean selfAborted = true; for (ISink channel : downStreamChannelList) { try { - channel.abort(); + selfAborted = channel.abort(); } catch (Exception e) { if (!meetError) { firstException = e; @@ -212,10 +213,15 @@ public class ShuffleSinkHandle implements ISinkHandle { if (meetError) { LOGGER.warn("Error occurred when try to abort channel.", firstException); } - sinkListener.onAborted(this); - aborted = true; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("[EndAbortShuffleSinkHandle]"); + if (selfAborted) { + sinkListener.onAborted(this); + aborted = true; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("[EndAbortShuffleSinkHandle]"); + } + return true; + } else { + return false; } } @@ -224,18 +230,19 @@ public class ShuffleSinkHandle implements ISinkHandle { // ShuffleSinkHandle while synchronized methods of ShuffleSinkHandle // Lock ShuffleSinkHandle and wait to lock LocalSinkChannel @Override - public void close() { + public boolean close() { if (closed || aborted) { - return; + return false; } if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartCloseShuffleSinkHandle]"); } boolean meetError = false; Exception firstException = null; + boolean selfClosed = true; for (ISink channel : downStreamChannelList) { try { - channel.close(); + selfClosed = channel.close(); } catch (Exception e) { if (!meetError) { firstException = e; @@ -246,10 +253,15 @@ public class ShuffleSinkHandle implements ISinkHandle { if (meetError) { LOGGER.warn("Error occurred when try to close channel.", firstException); } - sinkListener.onFinish(this); - closed = true; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("[EndCloseShuffleSinkHandle]"); + if (selfClosed) { + sinkListener.onFinish(this); + closed = true; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("[EndCloseShuffleSinkHandle]"); + } + return true; + } else { + return false; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java index 8915938eb03..ca6fdadc993 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java @@ -239,12 +239,12 @@ public class SinkChannel implements ISinkChannel { } @Override - public synchronized void abort() { + public synchronized boolean abort() { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartAbortSinkChannel]"); } if (aborted || closed) { - return; + return false; } sequenceIdToTsBlock.clear(); if (blocked != null) { @@ -265,15 +265,16 @@ public class SinkChannel implements ISinkChannel { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[EndAbortSinkChannel]"); } + return true; } @Override - public synchronized void close() { + public synchronized boolean close() { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartCloseSinkChannel]"); } if (closed || aborted) { - return; + return false; } sequenceIdToTsBlock.clear(); if (blocked != null) { @@ -294,6 +295,7 @@ public class SinkChannel implements ISinkChannel { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[EndCloseSinkChannel]"); } + return true; } private void invokeOnFinished() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java index f5e96a980ce..9a825a055c4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java @@ -304,7 +304,16 @@ public class FragmentInstanceExecution { staticsRemoved = true; statisticsLock.writeLock().unlock(); - clearShuffleSinkHandle(newState); + // must clear shuffle sink handle before driver close + // because in failed state, if we can driver.close firstly, we will finally call + // sink.setNoMoreTsBlocks() which may mislead upstream that downstream normally ends + try { + clearShuffleSinkHandle(newState); + } catch (Throwable t) { + LOGGER.error( + "Errors occurred while attempting to release sink, potentially leading to resource leakage.", + t); + } // close the driver after sink is aborted or closed because in driver.close() it // will try to call ISink.setNoMoreTsBlocks() @@ -317,14 +326,33 @@ public class FragmentInstanceExecution { // release file handlers context.releaseResourceWhenAllDriversAreClosed(); - // delete tmp file if exists - deleteTmpFile(); + try { + // delete tmp file if exists + deleteTmpFile(); + } catch (Throwable t) { + LOGGER.error( + "Errors occurred while attempting to delete tmp files, potentially leading to resource leakage.", + t); + } - // release memory - exchangeManager.deRegisterFragmentInstanceFromMemoryPool( - instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(), true); + try { + // release memory + exchangeManager.deRegisterFragmentInstanceFromMemoryPool( + instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(), true); + } catch (Throwable t) { + LOGGER.error( + "Errors occurred while attempting to deRegister FI from Memory Pool, potentially leading to resource leakage, status is {}.", + newState, + t); + } - context.releaseMemoryReservationManager(); + try { + context.releaseMemoryReservationManager(); + } catch (Throwable t) { + LOGGER.error( + "Errors occurred while attempting to release memory, potentially leading to resource leakage.", + t); + } if (newState.isFailed()) { scheduler.abortFragmentInstance(instanceId); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java index 9fd7053ba92..785395af8fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java @@ -111,7 +111,8 @@ public class ErrorHandlingUtils { || status.getCode() == TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode() || status.getCode() == TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode() || status.getCode() == TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode() - || status.getCode() == TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode()) { + || status.getCode() == TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode() + || status.getCode() == TSStatusCode.QUERY_TIMEOUT.getStatusCode()) { LOGGER.info(message); } else { LOGGER.warn(message, e); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/StubSink.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/StubSink.java index 7fce04723e3..6217e6a8e8b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/StubSink.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/StubSink.java @@ -92,15 +92,17 @@ public class StubSink implements ISink { } @Override - public void abort() { + public boolean abort() { closed = true; tsBlocks.clear(); + return true; } @Override - public void close() { + public boolean close() { closed = true; tsBlocks.clear(); + return true; } @Override
