This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a18c7805052 Fix the status code of fetching schema when memory is not
enough
a18c7805052 is described below
commit a18c780505247f736ccd37aa4a88fc53dab66159
Author: shuwenwei <[email protected]>
AuthorDate: Fri Aug 15 10:13:02 2025 +0800
Fix the status code of fetching schema when memory is not enough
---
.../execution/operator/schema/SchemaFetchScanOperator.java | 10 ++++++++--
.../org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java | 7 ++++++-
.../plan/analyze/schema/ClusterSchemaFetchExecutor.java | 3 +++
.../java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java | 4 +++-
4 files changed, 20 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
index 6cc8e06878c..9a8dbf9f21b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
@@ -222,11 +222,17 @@ public class SchemaFetchScanOperator implements
SourceOperator {
schemaNodeIteratorForSerialize = schemaTree.getIteratorForSerialize();
baos = new PublicBAOS(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES +
EXTRA_SIZE_TO_AVOID_GROW);
if (operatorContext != null) {
- schemaTreeMemCost = schemaTree.ramBytesUsed();
+ long ramBytesUsed = schemaTree.ramBytesUsed();
operatorContext
.getInstanceContext()
.getMemoryReservationContext()
- .reserveMemoryCumulatively(schemaTreeMemCost);
+ .reserveMemoryCumulatively(ramBytesUsed);
+ // For temporary and independently counted memory, we need process it
immediately
+ operatorContext
+ .getInstanceContext()
+ .getMemoryReservationContext()
+ .reserveMemoryImmediately();
+ this.schemaTreeMemCost = ramBytesUsed;
}
} catch (MetadataException e) {
throw new SchemaExecutionException(e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
index 8641e4631c2..b3517a6086f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
@@ -45,7 +45,12 @@ public class Analyzer {
long startTime = System.nanoTime();
AnalyzeVisitor visitor = new AnalyzeVisitor(partitionFetcher,
schemaFetcher);
Analysis analysis = null;
-
context.setReserveMemoryForSchemaTreeFunc(context::reserveMemoryForFrontEnd);
+ context.setReserveMemoryForSchemaTreeFunc(
+ mem -> {
+ context.reserveMemoryForFrontEnd(mem);
+ // For temporary and independently counted memory, we need process
it immediately
+ context.reserveMemoryForFrontEndImmediately();
+ });
try {
analysis = visitor.process(statement, context);
} finally {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index b959e05e1c5..637516ef83a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
@@ -326,6 +327,8 @@ class ClusterSchemaFetchExecutor {
throw new RuntimeException(
new MetadataException("Failed to fetch schema because of
unrecognized data"));
}
+ } catch (MemoryNotEnoughException e) {
+ throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index f8c709e5773..46544fc9834 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -163,7 +163,9 @@ public class ErrorHandlingUtils {
TSStatusCode.QUERY_NOT_ALLOWED, INFO_NOT_ALLOWED_IN_BATCH_ERROR +
rootCause.getMessage());
} else if (t instanceof RootFIPlacementException
|| t instanceof ReplicaSetUnreachableException
- || t instanceof QuerySchemaFetchFailedException) {
+ || (t instanceof QuerySchemaFetchFailedException
+ && ((QuerySchemaFetchFailedException) t).getErrorCode()
+ !=
TSStatusCode.QUERY_EXECUTION_MEMORY_NOT_ENOUGH.getStatusCode())) {
return RpcUtils.getStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION,
rootCause.getMessage());
} else if (t instanceof IoTDBException) {
return Objects.nonNull(((IoTDBException) t).getStatus())