This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 6751b020cdf [To dev/1.3] Fix potential memory leak of Memory Pool
6751b020cdf is described below
commit 6751b020cdf4d7c1d322992f2bb00473b58957dc
Author: Jackie Tien <[email protected]>
AuthorDate: Fri Jul 11 12:12:02 2025 +0800
[To dev/1.3] Fix potential memory leak of Memory Pool
(cherry picked from commit 4be53debee7a6455ac3e8267ef31187ef0d5b092)
---
.../execution/exchange/SharedTsBlockQueue.java | 2 +-
.../queryengine/execution/exchange/sink/ISink.java | 4 +--
.../execution/exchange/sink/LocalSinkChannel.java | 14 ++++----
.../execution/exchange/sink/ShuffleSinkHandle.java | 40 +++++++++++++--------
.../execution/exchange/sink/SinkChannel.java | 10 +++---
.../fragment/FragmentInstanceExecution.java | 42 ++++++++++++++++++----
.../apache/iotdb/db/utils/ErrorHandlingUtils.java | 3 +-
.../queryengine/execution/exchange/StubSink.java | 6 ++--
8 files changed, 84 insertions(+), 37 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
index dcd062052b7..ec854da306c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
@@ -74,7 +74,7 @@ public class SharedTsBlockQueue {
private ListenableFuture<Void> blockedOnMemory;
- private boolean closed = false;
+ private volatile boolean closed = false;
private boolean alreadyRegistered = false;
private LocalSourceHandle sourceHandle;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ISink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ISink.java
index 37ec1ba6fef..5ee71abb1cf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ISink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ISink.java
@@ -71,7 +71,7 @@ public interface ISink extends Accountable {
*
* <p>Should only be called in abnormal case
*/
- void abort();
+ boolean abort();
/**
* Close the ISink. If this is an ISinkHandle, we should close all its
channels. If this is an
@@ -80,7 +80,7 @@ public interface ISink extends Accountable {
*
* <p>Should only be called in normal case.
*/
- void close();
+ boolean close();
/** Return true if this ISink has been closed. Used in {@link
Driver#isFinishedInternal()}. */
boolean isClosed();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java
index 06ca3cf7965..91858a26d1b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java
@@ -49,8 +49,8 @@ public class LocalSinkChannel implements ISinkChannel {
@SuppressWarnings("squid:S3077")
private volatile ListenableFuture<Void> blocked;
- private boolean aborted = false;
- private boolean closed = false;
+ private volatile boolean aborted = false;
+ private volatile boolean closed = false;
private boolean invokedOnFinished = false;
@@ -181,14 +181,14 @@ public class LocalSinkChannel implements ISinkChannel {
}
@Override
- public void abort() {
+ public boolean abort() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartAbortLocalSinkChannel]");
}
synchronized (queue) {
synchronized (this) {
if (aborted || closed) {
- return;
+ return false;
}
aborted = true;
Optional<Throwable> t = sinkListener.onAborted(this);
@@ -202,17 +202,18 @@ public class LocalSinkChannel implements ISinkChannel {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndAbortLocalSinkChannel]");
}
+ return true;
}
@Override
- public void close() {
+ public boolean close() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartCloseLocalSinkChannel]");
}
synchronized (queue) {
synchronized (this) {
if (aborted || closed) {
- return;
+ return false;
}
closed = true;
queue.close();
@@ -225,6 +226,7 @@ public class LocalSinkChannel implements ISinkChannel {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndCloseLocalSinkChannel]");
}
+ return true;
}
public SharedTsBlockQueue getSharedTsBlockQueue() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
index 90b135dcc60..1b7f175d123 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
@@ -190,18 +190,19 @@ public class ShuffleSinkHandle implements ISinkHandle {
}
@Override
- public void abort() {
+ public boolean abort() {
if (aborted || closed) {
- return;
+ return false;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartAbortShuffleSinkHandle]");
}
boolean meetError = false;
Exception firstException = null;
+ boolean selfAborted = true;
for (ISink channel : downStreamChannelList) {
try {
- channel.abort();
+ selfAborted = channel.abort();
} catch (Exception e) {
if (!meetError) {
firstException = e;
@@ -212,10 +213,15 @@ public class ShuffleSinkHandle implements ISinkHandle {
if (meetError) {
LOGGER.warn("Error occurred when try to abort channel.", firstException);
}
- sinkListener.onAborted(this);
- aborted = true;
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("[EndAbortShuffleSinkHandle]");
+ if (selfAborted) {
+ sinkListener.onAborted(this);
+ aborted = true;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[EndAbortShuffleSinkHandle]");
+ }
+ return true;
+ } else {
+ return false;
}
}
@@ -224,18 +230,19 @@ public class ShuffleSinkHandle implements ISinkHandle {
// ShuffleSinkHandle while synchronized methods of ShuffleSinkHandle
// Lock ShuffleSinkHandle and wait to lock LocalSinkChannel
@Override
- public void close() {
+ public boolean close() {
if (closed || aborted) {
- return;
+ return false;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartCloseShuffleSinkHandle]");
}
boolean meetError = false;
Exception firstException = null;
+ boolean selfClosed = true;
for (ISink channel : downStreamChannelList) {
try {
- channel.close();
+ selfClosed = channel.close();
} catch (Exception e) {
if (!meetError) {
firstException = e;
@@ -246,10 +253,15 @@ public class ShuffleSinkHandle implements ISinkHandle {
if (meetError) {
LOGGER.warn("Error occurred when try to close channel.", firstException);
}
- sinkListener.onFinish(this);
- closed = true;
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("[EndCloseShuffleSinkHandle]");
+ if (selfClosed) {
+ sinkListener.onFinish(this);
+ closed = true;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[EndCloseShuffleSinkHandle]");
+ }
+ return true;
+ } else {
+ return false;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
index 8915938eb03..ca6fdadc993 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
@@ -239,12 +239,12 @@ public class SinkChannel implements ISinkChannel {
}
@Override
- public synchronized void abort() {
+ public synchronized boolean abort() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartAbortSinkChannel]");
}
if (aborted || closed) {
- return;
+ return false;
}
sequenceIdToTsBlock.clear();
if (blocked != null) {
@@ -265,15 +265,16 @@ public class SinkChannel implements ISinkChannel {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndAbortSinkChannel]");
}
+ return true;
}
@Override
- public synchronized void close() {
+ public synchronized boolean close() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartCloseSinkChannel]");
}
if (closed || aborted) {
- return;
+ return false;
}
sequenceIdToTsBlock.clear();
if (blocked != null) {
@@ -294,6 +295,7 @@ public class SinkChannel implements ISinkChannel {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndCloseSinkChannel]");
}
+ return true;
}
private void invokeOnFinished() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index f5e96a980ce..9a825a055c4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -304,7 +304,16 @@ public class FragmentInstanceExecution {
staticsRemoved = true;
statisticsLock.writeLock().unlock();
- clearShuffleSinkHandle(newState);
+ // must clear shuffle sink handle before driver close
+ // because in failed state, if we can driver.close firstly, we
will finally call
+ // sink.setNoMoreTsBlocks() which may mislead upstream that
downstream normally ends
+ try {
+ clearShuffleSinkHandle(newState);
+ } catch (Throwable t) {
+ LOGGER.error(
+ "Errors occurred while attempting to release sink,
potentially leading to resource leakage.",
+ t);
+ }
// close the driver after sink is aborted or closed because in
driver.close() it
// will try to call ISink.setNoMoreTsBlocks()
@@ -317,14 +326,33 @@ public class FragmentInstanceExecution {
// release file handlers
context.releaseResourceWhenAllDriversAreClosed();
- // delete tmp file if exists
- deleteTmpFile();
+ try {
+ // delete tmp file if exists
+ deleteTmpFile();
+ } catch (Throwable t) {
+ LOGGER.error(
+ "Errors occurred while attempting to delete tmp files,
potentially leading to resource leakage.",
+ t);
+ }
- // release memory
- exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
- instanceId.getQueryId().getId(),
instanceId.getFragmentInstanceId(), true);
+ try {
+ // release memory
+ exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
+ instanceId.getQueryId().getId(),
instanceId.getFragmentInstanceId(), true);
+ } catch (Throwable t) {
+ LOGGER.error(
+ "Errors occurred while attempting to deRegister FI from
Memory Pool, potentially leading to resource leakage, status is {}.",
+ newState,
+ t);
+ }
- context.releaseMemoryReservationManager();
+ try {
+ context.releaseMemoryReservationManager();
+ } catch (Throwable t) {
+ LOGGER.error(
+ "Errors occurred while attempting to release memory,
potentially leading to resource leakage.",
+ t);
+ }
if (newState.isFailed()) {
scheduler.abortFragmentInstance(instanceId);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 9fd7053ba92..785395af8fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -111,7 +111,8 @@ public class ErrorHandlingUtils {
|| status.getCode() ==
TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode()
|| status.getCode() ==
TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()
|| status.getCode() ==
TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()
- || status.getCode() ==
TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode()) {
+ || status.getCode() ==
TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode()
+ || status.getCode() == TSStatusCode.QUERY_TIMEOUT.getStatusCode())
{
LOGGER.info(message);
} else {
LOGGER.warn(message, e);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/StubSink.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/StubSink.java
index 7fce04723e3..6217e6a8e8b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/StubSink.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/StubSink.java
@@ -92,15 +92,17 @@ public class StubSink implements ISink {
}
@Override
- public void abort() {
+ public boolean abort() {
closed = true;
tsBlocks.clear();
+ return true;
}
@Override
- public void close() {
+ public boolean close() {
closed = true;
tsBlocks.clear();
+ return true;
}
@Override