This is an automated email from the ASF dual-hosted git repository.
shuwenwei pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/object_type by this push:
new 85ef82d9802 allocate memory
85ef82d9802 is described below
commit 85ef82d980237c18fa9f0fc87d93d0e5a78ae263
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jul 4 10:26:55 2025 +0800
allocate memory
---
.../relational/ColumnTransformerBuilder.java | 19 +++++++++++++----
.../plan/planner/TableOperatorGenerator.java | 23 +++++++++++++--------
.../plan/relational/sql/ast/DeleteDevice.java | 3 ++-
.../unary/scalar/ReadObjectColumnTransformer.java | 24 +++++++++++++++++++---
.../schemaregion/impl/SchemaRegionMemoryImpl.java | 6 ++++--
5 files changed, 56 insertions(+), 19 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
index 48586f6a34c..d766570e903 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import
org.apache.iotdb.db.queryengine.plan.relational.function.arithmetic.AdditionResolver;
@@ -178,6 +179,8 @@ import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.read.common.type.TypeEnum;
import org.apache.tsfile.utils.Binary;
+import javax.annotation.Nullable;
+
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
@@ -1006,10 +1009,13 @@ public class ColumnTransformerBuilder
.equalsIgnoreCase(functionName)) {
ColumnTransformer first = this.process(children.get(0), context);
if (children.size() == 1) {
- return new ReadObjectColumnTransformer(OBJECT, first);
+ return new ReadObjectColumnTransformer(OBJECT, first,
context.fragmentInstanceContext);
} else if (children.size() == 2) {
return new ReadObjectColumnTransformer(
- OBJECT, ((LongLiteral) children.get(1)).getParsedValue(), first);
+ OBJECT,
+ ((LongLiteral) children.get(1)).getParsedValue(),
+ first,
+ context.fragmentInstanceContext);
} else {
long offset = ((LongLiteral) children.get(1)).getParsedValue();
long length = ((LongLiteral) children.get(2)).getParsedValue();
@@ -1018,7 +1024,8 @@ public class ColumnTransformerBuilder
OBJECT,
((LongLiteral) children.get(1)).getParsedValue(),
((LongLiteral) children.get(2)).getParsedValue(),
- first);
+ first,
+ context.fragmentInstanceContext);
}
} else {
// user defined function
@@ -1484,6 +1491,8 @@ public class ColumnTransformerBuilder
private final Metadata metadata;
+ private final Optional<FragmentInstanceContext> fragmentInstanceContext;
+
public Context(
SessionInfo sessionInfo,
List<LeafColumnTransformer> leafList,
@@ -1494,7 +1503,8 @@ public class ColumnTransformerBuilder
List<TSDataType> inputDataTypes,
int originSize,
TypeProvider typeProvider,
- Metadata metadata) {
+ Metadata metadata,
+ @Nullable FragmentInstanceContext fragmentInstanceContext) {
this.sessionInfo = sessionInfo;
this.leafList = leafList;
this.inputLocations = inputLocations;
@@ -1505,6 +1515,7 @@ public class ColumnTransformerBuilder
this.originSize = originSize;
this.typeProvider = typeProvider;
this.metadata = metadata;
+ this.fragmentInstanceContext =
Optional.ofNullable(fragmentInstanceContext);
}
public Type getType(SymbolReference symbolReference) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 132f5ebcc00..811af011a90 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannel
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle;
import
org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.operator.EmptyDataOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.ExplainAnalyzeOperator;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
@@ -1311,13 +1312,15 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
ColumnTransformerBuilder visitor = new ColumnTransformerBuilder();
+ FragmentInstanceContext fragmentInstanceContext =
+ context.getDriverContext().getFragmentInstanceContext();
ColumnTransformer filterOutputTransformer =
predicate
.map(
p -> {
ColumnTransformerBuilder.Context
filterColumnTransformerContext =
new ColumnTransformerBuilder.Context(
-
context.getDriverContext().getFragmentInstanceContext().getSessionInfo(),
+ fragmentInstanceContext.getSessionInfo(),
filterLeafColumnTransformerList,
inputLocations,
filterExpressionColumnTransformerMap,
@@ -1326,7 +1329,8 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
ImmutableList.of(),
0,
context.getTypeProvider(),
- metadata);
+ metadata,
+ fragmentInstanceContext);
return visitor.process(p, filterColumnTransformerContext);
})
@@ -1344,7 +1348,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
ColumnTransformerBuilder.Context projectColumnTransformerContext =
new ColumnTransformerBuilder.Context(
-
context.getDriverContext().getFragmentInstanceContext().getSessionInfo(),
+ fragmentInstanceContext.getSessionInfo(),
projectLeafColumnTransformerList,
inputLocations,
projectExpressionColumnTransformerMap,
@@ -1353,7 +1357,8 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
filterOutputDataTypes,
inputLocations.size(),
context.getTypeProvider(),
- metadata);
+ metadata,
+ fragmentInstanceContext);
for (Expression expression : projectExpressions) {
projectOutputTransformerList.add(
@@ -2426,6 +2431,8 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
// In "count" we have to reuse filter operator per "next"
final List<LeafColumnTransformer> filterLeafColumnTransformerList = new
ArrayList<>();
+ FragmentInstanceContext fragmentInstanceContext =
+ context.getDriverContext().getFragmentInstanceContext();
return new SchemaCountOperator<>(
node.getPlanNodeId(),
context
@@ -2447,10 +2454,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
.process(
node.getTagFuzzyPredicate(),
new ColumnTransformerBuilder.Context(
- context
- .getDriverContext()
- .getFragmentInstanceContext()
- .getSessionInfo(),
+ fragmentInstanceContext.getSessionInfo(),
filterLeafColumnTransformerList,
makeLayout(Collections.singletonList(node)),
new HashMap<>(),
@@ -2459,7 +2463,8 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
ImmutableList.of(),
0,
context.getTypeProvider(),
- metadata)),
+ metadata,
+ fragmentInstanceContext)),
columnSchemaList,
database,
table)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DeleteDevice.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DeleteDevice.java
index 10935e97d4e..4a73f4b792f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DeleteDevice.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DeleteDevice.java
@@ -227,7 +227,8 @@ public class DeleteDevice extends AbstractTraverseDevice {
ImmutableList.of(),
0,
mockTypeProvider,
- metadata))
+ metadata,
+ null))
: null;
return new DeviceBlackListConstructor(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
index 53de974e895..6332ab43b3b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
import
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -35,27 +36,42 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
+import java.util.Optional;
public class ReadObjectColumnTransformer extends UnaryColumnTransformer {
+ private final Optional<FragmentInstanceContext> fragmentInstanceContext;
private long offset = 0;
private long length = -1;
- public ReadObjectColumnTransformer(Type type, ColumnTransformer
childColumnTransformer) {
+ public ReadObjectColumnTransformer(
+ Type type,
+ ColumnTransformer childColumnTransformer,
+ Optional<FragmentInstanceContext> fragmentInstanceContext) {
super(type, childColumnTransformer);
+ this.fragmentInstanceContext = fragmentInstanceContext;
}
public ReadObjectColumnTransformer(
- Type type, long offset, ColumnTransformer childColumnTransformer) {
+ Type type,
+ long offset,
+ ColumnTransformer childColumnTransformer,
+ Optional<FragmentInstanceContext> fragmentInstanceContext) {
super(type, childColumnTransformer);
this.offset = offset;
+ this.fragmentInstanceContext = fragmentInstanceContext;
}
public ReadObjectColumnTransformer(
- Type type, long offset, long length, ColumnTransformer
childColumnTransformer) {
+ Type type,
+ long offset,
+ long length,
+ ColumnTransformer childColumnTransformer,
+ Optional<FragmentInstanceContext> fragmentInstanceContext) {
super(type, childColumnTransformer);
this.offset = offset;
this.length = length;
+ this.fragmentInstanceContext = fragmentInstanceContext;
}
@Override
@@ -98,6 +114,8 @@ public class ReadObjectColumnTransformer extends
UnaryColumnTransformer {
if (actualReadSize > Integer.MAX_VALUE) {
throw new UnsupportedOperationException("Read object size is too large
(size > 2G)");
}
+ fragmentInstanceContext.ifPresent(
+ context ->
context.getMemoryReservationContext().reserveMemoryCumulatively(actualReadSize));
byte[] bytes = new byte[(int) actualReadSize];
ByteBuffer buffer = ByteBuffer.wrap(bytes);
try (FileChannel fileChannel = FileChannel.open(file.toPath(),
StandardOpenOption.READ)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index 3f49dfec93d..d09f2204db8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -1523,7 +1523,8 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
ImmutableList.of(),
0,
mockTypeProvider,
- metadata))
+ metadata,
+ null))
: null;
final List<TSDataType> filterOutputDataTypes =
@@ -1550,7 +1551,8 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
filterOutputDataTypes,
inputLocations.size(),
mockTypeProvider,
- metadata);
+ metadata,
+ null);
final List<String> attributeNames =
updateNode.getAssignments().stream()