This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/object_type by this push:
     new 85ef82d9802 allocate memory
85ef82d9802 is described below

commit 85ef82d980237c18fa9f0fc87d93d0e5a78ae263
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jul 4 10:26:55 2025 +0800

    allocate memory
---
 .../relational/ColumnTransformerBuilder.java       | 19 +++++++++++++----
 .../plan/planner/TableOperatorGenerator.java       | 23 +++++++++++++--------
 .../plan/relational/sql/ast/DeleteDevice.java      |  3 ++-
 .../unary/scalar/ReadObjectColumnTransformer.java  | 24 +++++++++++++++++++---
 .../schemaregion/impl/SchemaRegionMemoryImpl.java  |  6 ++++--
 5 files changed, 56 insertions(+), 19 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
index 48586f6a34c..d766570e903 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
 import 
org.apache.iotdb.db.queryengine.plan.relational.function.arithmetic.AdditionResolver;
@@ -178,6 +179,8 @@ import org.apache.tsfile.read.common.type.Type;
 import org.apache.tsfile.read.common.type.TypeEnum;
 import org.apache.tsfile.utils.Binary;
 
+import javax.annotation.Nullable;
+
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1006,10 +1009,13 @@ public class ColumnTransformerBuilder
         .equalsIgnoreCase(functionName)) {
       ColumnTransformer first = this.process(children.get(0), context);
       if (children.size() == 1) {
-        return new ReadObjectColumnTransformer(OBJECT, first);
+        return new ReadObjectColumnTransformer(OBJECT, first, 
context.fragmentInstanceContext);
       } else if (children.size() == 2) {
         return new ReadObjectColumnTransformer(
-            OBJECT, ((LongLiteral) children.get(1)).getParsedValue(), first);
+            OBJECT,
+            ((LongLiteral) children.get(1)).getParsedValue(),
+            first,
+            context.fragmentInstanceContext);
       } else {
         long offset = ((LongLiteral) children.get(1)).getParsedValue();
         long length = ((LongLiteral) children.get(2)).getParsedValue();
@@ -1018,7 +1024,8 @@ public class ColumnTransformerBuilder
             OBJECT,
             ((LongLiteral) children.get(1)).getParsedValue(),
             ((LongLiteral) children.get(2)).getParsedValue(),
-            first);
+            first,
+            context.fragmentInstanceContext);
       }
     } else {
       // user defined function
@@ -1484,6 +1491,8 @@ public class ColumnTransformerBuilder
 
     private final Metadata metadata;
 
+    private final Optional<FragmentInstanceContext> fragmentInstanceContext;
+
     public Context(
         SessionInfo sessionInfo,
         List<LeafColumnTransformer> leafList,
@@ -1494,7 +1503,8 @@ public class ColumnTransformerBuilder
         List<TSDataType> inputDataTypes,
         int originSize,
         TypeProvider typeProvider,
-        Metadata metadata) {
+        Metadata metadata,
+        @Nullable FragmentInstanceContext fragmentInstanceContext) {
       this.sessionInfo = sessionInfo;
       this.leafList = leafList;
       this.inputLocations = inputLocations;
@@ -1505,6 +1515,7 @@ public class ColumnTransformerBuilder
       this.originSize = originSize;
       this.typeProvider = typeProvider;
       this.metadata = metadata;
+      this.fragmentInstanceContext = 
Optional.ofNullable(fragmentInstanceContext);
     }
 
     public Type getType(SymbolReference symbolReference) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 132f5ebcc00..811af011a90 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannel
 import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle;
 import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.queryengine.execution.operator.EmptyDataOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.ExplainAnalyzeOperator;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
@@ -1311,13 +1312,15 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
 
     ColumnTransformerBuilder visitor = new ColumnTransformerBuilder();
 
+    FragmentInstanceContext fragmentInstanceContext =
+        context.getDriverContext().getFragmentInstanceContext();
     ColumnTransformer filterOutputTransformer =
         predicate
             .map(
                 p -> {
                   ColumnTransformerBuilder.Context 
filterColumnTransformerContext =
                       new ColumnTransformerBuilder.Context(
-                          
context.getDriverContext().getFragmentInstanceContext().getSessionInfo(),
+                          fragmentInstanceContext.getSessionInfo(),
                           filterLeafColumnTransformerList,
                           inputLocations,
                           filterExpressionColumnTransformerMap,
@@ -1326,7 +1329,8 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
                           ImmutableList.of(),
                           0,
                           context.getTypeProvider(),
-                          metadata);
+                          metadata,
+                          fragmentInstanceContext);
 
                   return visitor.process(p, filterColumnTransformerContext);
                 })
@@ -1344,7 +1348,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
 
     ColumnTransformerBuilder.Context projectColumnTransformerContext =
         new ColumnTransformerBuilder.Context(
-            
context.getDriverContext().getFragmentInstanceContext().getSessionInfo(),
+            fragmentInstanceContext.getSessionInfo(),
             projectLeafColumnTransformerList,
             inputLocations,
             projectExpressionColumnTransformerMap,
@@ -1353,7 +1357,8 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
             filterOutputDataTypes,
             inputLocations.size(),
             context.getTypeProvider(),
-            metadata);
+            metadata,
+            fragmentInstanceContext);
 
     for (Expression expression : projectExpressions) {
       projectOutputTransformerList.add(
@@ -2426,6 +2431,8 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
 
     // In "count" we have to reuse filter operator per "next"
     final List<LeafColumnTransformer> filterLeafColumnTransformerList = new 
ArrayList<>();
+    FragmentInstanceContext fragmentInstanceContext =
+        context.getDriverContext().getFragmentInstanceContext();
     return new SchemaCountOperator<>(
         node.getPlanNodeId(),
         context
@@ -2447,10 +2454,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
                         .process(
                             node.getTagFuzzyPredicate(),
                             new ColumnTransformerBuilder.Context(
-                                context
-                                    .getDriverContext()
-                                    .getFragmentInstanceContext()
-                                    .getSessionInfo(),
+                                fragmentInstanceContext.getSessionInfo(),
                                 filterLeafColumnTransformerList,
                                 makeLayout(Collections.singletonList(node)),
                                 new HashMap<>(),
@@ -2459,7 +2463,8 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
                                 ImmutableList.of(),
                                 0,
                                 context.getTypeProvider(),
-                                metadata)),
+                                metadata,
+                                fragmentInstanceContext)),
                     columnSchemaList,
                     database,
                     table)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DeleteDevice.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DeleteDevice.java
index 10935e97d4e..4a73f4b792f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DeleteDevice.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DeleteDevice.java
@@ -227,7 +227,8 @@ public class DeleteDevice extends AbstractTraverseDevice {
                     ImmutableList.of(),
                     0,
                     mockTypeProvider,
-                    metadata))
+                    metadata,
+                    null))
             : null;
 
     return new DeviceBlackListConstructor(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
index 53de974e895..6332ab43b3b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
 
 import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -35,27 +36,42 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
+import java.util.Optional;
 
 public class ReadObjectColumnTransformer extends UnaryColumnTransformer {
 
+  private final Optional<FragmentInstanceContext> fragmentInstanceContext;
   private long offset = 0;
   private long length = -1;
 
-  public ReadObjectColumnTransformer(Type type, ColumnTransformer 
childColumnTransformer) {
+  public ReadObjectColumnTransformer(
+      Type type,
+      ColumnTransformer childColumnTransformer,
+      Optional<FragmentInstanceContext> fragmentInstanceContext) {
     super(type, childColumnTransformer);
+    this.fragmentInstanceContext = fragmentInstanceContext;
   }
 
   public ReadObjectColumnTransformer(
-      Type type, long offset, ColumnTransformer childColumnTransformer) {
+      Type type,
+      long offset,
+      ColumnTransformer childColumnTransformer,
+      Optional<FragmentInstanceContext> fragmentInstanceContext) {
     super(type, childColumnTransformer);
     this.offset = offset;
+    this.fragmentInstanceContext = fragmentInstanceContext;
   }
 
   public ReadObjectColumnTransformer(
-      Type type, long offset, long length, ColumnTransformer 
childColumnTransformer) {
+      Type type,
+      long offset,
+      long length,
+      ColumnTransformer childColumnTransformer,
+      Optional<FragmentInstanceContext> fragmentInstanceContext) {
     super(type, childColumnTransformer);
     this.offset = offset;
     this.length = length;
+    this.fragmentInstanceContext = fragmentInstanceContext;
   }
 
   @Override
@@ -98,6 +114,8 @@ public class ReadObjectColumnTransformer extends 
UnaryColumnTransformer {
     if (actualReadSize > Integer.MAX_VALUE) {
       throw new UnsupportedOperationException("Read object size is too large 
(size > 2G)");
     }
+    fragmentInstanceContext.ifPresent(
+        context -> 
context.getMemoryReservationContext().reserveMemoryCumulatively(actualReadSize));
     byte[] bytes = new byte[(int) actualReadSize];
     ByteBuffer buffer = ByteBuffer.wrap(bytes);
     try (FileChannel fileChannel = FileChannel.open(file.toPath(), 
StandardOpenOption.READ)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index 3f49dfec93d..d09f2204db8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -1523,7 +1523,8 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
                     ImmutableList.of(),
                     0,
                     mockTypeProvider,
-                    metadata))
+                    metadata,
+                    null))
             : null;
 
     final List<TSDataType> filterOutputDataTypes =
@@ -1550,7 +1551,8 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
             filterOutputDataTypes,
             inputLocations.size(),
             mockTypeProvider,
-            metadata);
+            metadata,
+            null);
 
     final List<String> attributeNames =
         updateNode.getAssignments().stream()

Reply via email to