This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch timeoutSink in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e1c86a60b1b5161036d4b169b05cdbf625701be9 Author: lancelly <[email protected]> AuthorDate: Tue Apr 9 19:00:33 2024 +0800 fix --- .../execution/exchange/sink/ShuffleSinkHandle.java | 4 ++-- .../execution/exchange/source/LocalSourceHandle.java | 9 --------- .../iotdb/db/queryengine/execution/memory/MemoryPool.java | 14 +++++++------- 3 files changed, 9 insertions(+), 18 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java index 7c31ab67e04..b9bf277b68b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java @@ -188,7 +188,6 @@ public class ShuffleSinkHandle implements ISinkHandle { if (aborted || closed) { return; } - aborted = true; if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartAbortShuffleSinkHandle]"); } @@ -208,6 +207,7 @@ public class ShuffleSinkHandle implements ISinkHandle { LOGGER.warn("Error occurred when try to abort channel.", firstException); } sinkListener.onAborted(this); + aborted = true; if (LOGGER.isDebugEnabled()) { LOGGER.debug("[EndAbortShuffleSinkHandle]"); } @@ -222,7 +222,6 @@ public class ShuffleSinkHandle implements ISinkHandle { if (closed || aborted) { return; } - closed = true; if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartCloseShuffleSinkHandle]"); } @@ -242,6 +241,7 @@ public class ShuffleSinkHandle implements ISinkHandle { LOGGER.warn("Error occurred when try to close channel.", firstException); } sinkListener.onFinish(this); + closed = true; if (LOGGER.isDebugEnabled()) { LOGGER.debug("[EndCloseShuffleSinkHandle]"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java index de37c64e460..7c6b56fd5ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java @@ -183,9 +183,6 @@ public class LocalSourceHandle implements ISourceHandle { @Override public void abort() { - if (aborted || closed) { - return; - } try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartAbortLocalSourceHandle]"); @@ -208,9 +205,6 @@ public class LocalSourceHandle implements ISourceHandle { @Override public void abort(Throwable t) { - if (aborted || closed) { - return; - } try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartAbortLocalSourceHandle]"); @@ -233,9 +227,6 @@ public class LocalSourceHandle implements ISourceHandle { @Override public void close() { - if (aborted || closed) { - return; - } try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[StartCloseLocalSourceHandle]"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java index 3b8adc70d44..c43ba99c63a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java @@ -223,9 +223,9 @@ public class MemoryPool { String planNodeId, long bytesToReserve, long maxBytesCanReserve) { - Validate.notNull(queryId); - Validate.notNull(fragmentInstanceId); - Validate.notNull(planNodeId); + Validate.notNull(queryId, "queryId can not be null."); + Validate.notNull(fragmentInstanceId, "fragmentInstanceId can not be null."); + Validate.notNull(planNodeId, "planNodeId can not be null."); Validate.isTrue( bytesToReserve > 0L && bytesToReserve <= maxBytesPerFragmentInstance, "bytesToReserve should be in (0,maxBytesPerFI]. maxBytesPerFI: %d", @@ -263,9 +263,9 @@ public class MemoryPool { String planNodeId, long bytesToReserve, long maxBytesCanReserve) { - Validate.notNull(queryId); - Validate.notNull(fragmentInstanceId); - Validate.notNull(planNodeId); + Validate.notNull(queryId, "queryId can not be null."); + Validate.notNull(fragmentInstanceId, "fragmentInstanceId can not be null."); + Validate.notNull(planNodeId, "planNodeId can not be null."); Validate.isTrue( bytesToReserve > 0L && bytesToReserve <= maxBytesPerFragmentInstance, "bytesToReserve should be in (0,maxBytesPerFI]. maxBytesPerFI: %d", @@ -288,10 +288,10 @@ public class MemoryPool { */ @SuppressWarnings("squid:S2445") public synchronized long tryCancel(ListenableFuture<Void> future) { + Validate.notNull(future, "The future to be cancelled can not be null."); // add synchronized on the future to avoid that the future is concurrently completed by // MemoryPool.free() which may lead to memory leak. synchronized (future) { - Validate.notNull(future, "The future to be cancelled can not be null."); // If the future is not a MemoryReservationFuture, it must have been completed. if (future.isDone()) { return 0L;
