This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch TimeoutError in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e0770b0d0aa4909d7c4ee00f3ce1b84f1bdde3bc Author: JackieTien97 <[email protected]> AuthorDate: Wed Jul 9 19:00:51 2025 +0800 Fix potential memory leak --- .../execution/exchange/SharedTsBlockQueue.java | 2 +- .../queryengine/execution/exchange/sink/ISink.java | 4 +-- .../execution/exchange/sink/LocalSinkChannel.java | 14 ++++---- .../execution/exchange/sink/ShuffleSinkHandle.java | 40 +++++++++++++-------- .../execution/exchange/sink/SinkChannel.java | 10 +++--- .../fragment/FragmentInstanceExecution.java | 41 +++++++++++++++++----- .../apache/iotdb/db/utils/ErrorHandlingUtils.java | 3 +- .../queryengine/execution/exchange/StubSink.java | 6 ++-- .../apache/iotdb/commons/utils/StatusUtils.java | 1 - 9 files changed, 82 insertions(+), 39 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java index 73e0573160c..d742c65dda2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java @@ -74,7 +74,7 @@ public class SharedTsBlockQueue { private ListenableFuture<Void> blockedOnMemory; - private boolean closed = false; + private volatile boolean closed = false; private boolean alreadyRegistered = false; private LocalSourceHandle sourceHandle; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ISink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ISink.java index 37ec1ba6fef..5ee71abb1cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ISink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ISink.java @@ -71,7 +71,7 @@ public interface ISink extends Accountable { * * <p>Should only be called in abnormal case */ - void abort(); + boolean abort(); /** * Close the ISink. If this is an ISinkHandle, we should close all its channels. If this is an @@ -80,7 +80,7 @@ public interface ISink extends Accountable { * * <p>Should only be called in normal case. */ - void close(); + boolean close(); /** Return true if this ISink has been closed. Used in {@link Driver#isFinishedInternal()}. */ boolean isClosed(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java index e29cf5a455c..c73128e6cba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java @@ -50,8 +50,8 @@ public class LocalSinkChannel implements ISinkChannel { @SuppressWarnings("squid:S3077") private volatile ListenableFuture<Void> blocked; - private boolean aborted = false; - private boolean closed = false; + private volatile boolean aborted = false; + private volatile boolean closed = false; private boolean invokedOnFinished = false; @@ -182,14 +182,14 @@ public class LocalSinkChannel implements ISinkChannel { } @Override - public void abort() { + public boolean abort() { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartAbortLocalSinkChannel]"); } synchronized (queue) { synchronized (this) { if (aborted || closed) { - return; + return false; } aborted = true; Optional<Throwable> t = sinkListener.onAborted(this); @@ -203,17 +203,18 @@ public class LocalSinkChannel implements ISinkChannel { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[EndAbortLocalSinkChannel]"); } + return true; } @Override - public void close() { + public boolean close() { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartCloseLocalSinkChannel]"); } synchronized (queue) { synchronized (this) { if (aborted || closed) { - return; + return false; } closed = true; queue.close(); @@ -226,6 +227,7 @@ public class LocalSinkChannel implements ISinkChannel { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[EndCloseLocalSinkChannel]"); } + return true; } public SharedTsBlockQueue getSharedTsBlockQueue() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java index d8624258751..7865de9f465 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java @@ -190,18 +190,19 @@ public class ShuffleSinkHandle implements ISinkHandle { } @Override - public void abort() { + public boolean abort() { if (aborted || closed) { - return; + return false; } if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartAbortShuffleSinkHandle]"); } boolean meetError = false; Exception firstException = null; + boolean selfAborted = true; for (ISink channel : downStreamChannelList) { try { - channel.abort(); + selfAborted = channel.abort(); } catch (Exception e) { if (!meetError) { firstException = e; @@ -212,10 +213,15 @@ public class ShuffleSinkHandle implements ISinkHandle { if (meetError) { LOGGER.warn("Error occurred when try to abort channel.", firstException); } - sinkListener.onAborted(this); - aborted = true; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("[EndAbortShuffleSinkHandle]"); + if (selfAborted) { + sinkListener.onAborted(this); + aborted = true; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("[EndAbortShuffleSinkHandle]"); + } + return true; + } else { + return false; } } @@ -224,18 +230,19 @@ public class ShuffleSinkHandle implements ISinkHandle { // ShuffleSinkHandle while synchronized methods of ShuffleSinkHandle // Lock ShuffleSinkHandle and wait to lock LocalSinkChannel @Override - public void close() { + public boolean close() { if (closed || aborted) { - return; + return false; } if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartCloseShuffleSinkHandle]"); } boolean meetError = false; Exception firstException = null; + boolean selfClosed = true; for (ISink channel : downStreamChannelList) { try { - channel.close(); + selfClosed = channel.close(); } catch (Exception e) { if (!meetError) { firstException = e; @@ -246,10 +253,15 @@ public class ShuffleSinkHandle implements ISinkHandle { if (meetError) { LOGGER.warn("Error occurred when try to close channel.", firstException); } - sinkListener.onFinish(this); - closed = true; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("[EndCloseShuffleSinkHandle]"); + if (selfClosed) { + sinkListener.onFinish(this); + closed = true; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("[EndCloseShuffleSinkHandle]"); + } + return true; + } else { + return false; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java index 58cd737fc20..516798f3daa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java @@ -239,12 +239,12 @@ public class SinkChannel implements ISinkChannel { } @Override - public synchronized void abort() { + public synchronized boolean abort() { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartAbortSinkChannel]"); } if (aborted || closed) { - return; + return false; } sequenceIdToTsBlock.clear(); if (blocked != null) { @@ -265,15 +265,16 @@ public class SinkChannel implements ISinkChannel { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[EndAbortSinkChannel]"); } + return true; } @Override - public synchronized void close() { + public synchronized boolean close() { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartCloseSinkChannel]"); } if (closed || aborted) { - return; + return false; } sequenceIdToTsBlock.clear(); if (blocked != null) { @@ -294,6 +295,7 @@ public class SinkChannel implements ISinkChannel { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[EndCloseSinkChannel]"); } + return true; } private void invokeOnFinished() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java index 1094266fa39..e578206f1ee 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java @@ -288,8 +288,6 @@ public class FragmentInstanceExecution { staticsRemoved = true; statisticsLock.writeLock().unlock(); - clearShuffleSinkHandle(newState); - // close the driver after sink is aborted or closed because in driver.close() it // will try to call ISink.setNoMoreTsBlocks() for (IDriver driver : drivers) { @@ -301,14 +299,41 @@ public class FragmentInstanceExecution { // release file handlers context.releaseResourceWhenAllDriversAreClosed(); - // delete tmp file if exists - deleteTmpFile(); + try { + clearShuffleSinkHandle(newState); + } catch (Throwable t) { + LOGGER.error( + "Errors occurred while attempting to release sink, potentially leading to resource leakage.", + t); + } + + try { + // delete tmp file if exists + deleteTmpFile(); + } catch (Throwable t) { + LOGGER.error( + "Errors occurred while attempting to delete tmp files, potentially leading to resource leakage.", + t); + } - // release memory - exchangeManager.deRegisterFragmentInstanceFromMemoryPool( - instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(), true); + try { + // release memory + exchangeManager.deRegisterFragmentInstanceFromMemoryPool( + instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(), true); + } catch (Throwable t) { + LOGGER.error( + "Errors occurred while attempting to deRegister FI from Memory Pool, potentially leading to resource leakage, status is {}.", + newState, + t); + } - context.releaseMemoryReservationManager(); + try { + context.releaseMemoryReservationManager(); + } catch (Throwable t) { + LOGGER.error( + "Errors occurred while attempting to release memory, potentially leading to resource leakage.", + t); + } if (newState.isFailed()) { scheduler.abortFragmentInstance(instanceId, context.getFailureCause().orElse(null)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java index d724ea5f886..62b642f9ae5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java @@ -119,7 +119,8 @@ public class ErrorHandlingUtils { || status.getCode() == TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode() || status.getCode() == TSStatusCode.NO_AVAILABLE_REPLICA.getStatusCode() || status.getCode() == TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode() - || status.getCode() == TSStatusCode.QUERY_EXECUTION_MEMORY_NOT_ENOUGH.getStatusCode()) { + || status.getCode() == TSStatusCode.QUERY_EXECUTION_MEMORY_NOT_ENOUGH.getStatusCode() + || status.getCode() == TSStatusCode.QUERY_TIMEOUT.getStatusCode()) { LOGGER.info(message); } else { LOGGER.warn(message, e); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/StubSink.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/StubSink.java index 7fce04723e3..6217e6a8e8b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/StubSink.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/StubSink.java @@ -92,15 +92,17 @@ public class StubSink implements ISink { } @Override - public void abort() { + public boolean abort() { closed = true; tsBlocks.clear(); + return true; } @Override - public void close() { + public boolean close() { closed = true; tsBlocks.clear(); + return true; } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java index ac64699b9cf..ed327969c35 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java @@ -68,7 +68,6 @@ public class StatusUtils { NEED_RETRY.add(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode()); NEED_RETRY.add(TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()); NEED_RETRY.add(TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()); - NEED_RETRY.add(TSStatusCode.QUERY_EXECUTION_MEMORY_NOT_ENOUGH.getStatusCode()); NEED_RETRY.add(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()); }
