This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7f34bc03129 Specify error msg when DriverTask is aborted by
MemoryNotEnoughException
7f34bc03129 is described below
commit 7f34bc03129f8a9e477211e66bcbafb8bdab8c1a
Author: Liao Lanyu <[email protected]>
AuthorDate: Tue Jun 25 13:59:09 2024 +0800
Specify error msg when DriverTask is aborted by MemoryNotEnoughException
---
.../queryengine/execution/schedule/AbstractDriverThread.java | 12 +++++++++++-
.../execution/schedule/DriverTaskAbortedException.java | 3 +++
.../dataregion/read/reader/common/PriorityMergeReader.java | 3 +++
3 files changed, 17 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
index c433f0d2925..381c9b9378a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.db.queryengine.execution.schedule;
+import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
import
org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTask;
+import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.db.utils.SetThreadName;
import org.slf4j.Logger;
@@ -77,7 +79,7 @@ public abstract class AbstractDriverThread extends Thread
implements Closeable {
try (SetThreadName driverTaskName =
new
SetThreadName(next.getDriver().getDriverTaskId().getFullId())) {
logger.warn("[ExecuteFailed]", e);
-
next.setAbortCause(DriverTaskAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
+ next.setAbortCause(getAbortCause(e));
scheduler.toAborted(next);
}
} finally {
@@ -113,4 +115,12 @@ public abstract class AbstractDriverThread extends Thread
implements Closeable {
public void close() throws IOException {
closed = true;
}
+
+ private String getAbortCause(final Exception e) {
+ Throwable rootCause = ErrorHandlingUtils.getRootCause(e);
+ if (rootCause instanceof MemoryNotEnoughException) {
+ return DriverTaskAbortedException.BY_MEMORY_NOT_ENOUGH;
+ }
+ return DriverTaskAbortedException.BY_INTERNAL_ERROR_SCHEDULED;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskAbortedException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskAbortedException.java
index 1f066e736d5..aac6d9fdebe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskAbortedException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskAbortedException.java
@@ -30,6 +30,9 @@ public class DriverTaskAbortedException extends Exception {
public static final String BY_ALREADY_BEING_CANCELLED = "already being
cancelled";
public static final String BY_INTERNAL_ERROR_SCHEDULED = "internal error
scheduled";
+ public static final String BY_MEMORY_NOT_ENOUGH =
+ "Memory is not enough to execute the query task.";
+
public DriverTaskAbortedException(String driverTaskName, String causeMsg) {
super(String.format("DriverTask %s is aborted by %s", driverTaskName,
causeMsg));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
index a7c96131acf..7fc89dce439 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
@@ -182,6 +182,9 @@ public class PriorityMergeReader implements IPointReader {
Element e = heap.poll();
e.close();
}
+ if (memoryReservationManager != null) {
+ memoryReservationManager.releaseMemoryCumulatively(usedMemorySize);
+ }
usedMemorySize = 0;
}