This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 743e2e738b1 Optimize group by query in ClientRPCServiceImpl to reduce
cpu usage (#15178)
743e2e738b1 is described below
commit 743e2e738b16b67a1705c95f12951d6a724cc40c
Author: Beyyes <[email protected]>
AuthorDate: Wed Apr 9 13:59:43 2025 +0800
Optimize group by query in ClientRPCServiceImpl to reduce cpu usage (#15178)
---
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 69 +++++++-------
.../execution/aggregation/Aggregator.java | 37 +++----
.../execution/driver/DriverContext.java | 6 ++
.../fragment/FakedFragmentInstanceContext.java | 106 +++++++++++++++++++++
.../fragment/FragmentInstanceContext.java | 19 ++--
.../execution/fragment/QueryContext.java | 2 +-
.../execution/schedule/task/DriverTaskId.java | 3 +-
.../memory/FakedMemoryReservationManager.java | 35 +++++++
.../dataregion/tsfile/TsFileResourceList.java | 2 +-
.../thrift-datanode/src/main/thrift/client.thrift | 1 +
10 files changed, 212 insertions(+), 68 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 4c2c4a4e654..3c710d0a82f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -53,9 +53,6 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.protocol.thrift.OperationType;
-import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
-import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
-import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
@@ -63,14 +60,12 @@ import
org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
import
org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.queryengine.execution.aggregation.Aggregator;
import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
-import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
-import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
-import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FakedFragmentInstanceContext;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil;
import
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesAggregationScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesAggregationScanOperator;
-import
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
@@ -109,6 +104,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.UnsetSch
import org.apache.iotdb.db.schemaengine.template.TemplateQueryType;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager;
import org.apache.iotdb.db.storageengine.rescon.quotas.OperationQuota;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
@@ -205,7 +201,6 @@ import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
import static
org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
-import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest;
import static
org.apache.iotdb.db.utils.CommonUtils.getContentOfTSFastLastDataQueryForOneDeviceReq;
@@ -244,7 +239,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
private final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE =
DataNodeSchemaCache.getInstance();
- public static Duration DEFAULT_TIME_SLICE = new Duration(60_000,
TimeUnit.MILLISECONDS);
+ public static final Duration DEFAULT_TIME_SLICE = new Duration(60_000,
TimeUnit.MILLISECONDS);
private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
@@ -655,6 +650,9 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
}
}
+ private final List<InputLocation[]> inputLocationList =
+ Collections.singletonList(new InputLocation[] {new InputLocation(0, 0)});
+
@SuppressWarnings("java:S2095") // close() do nothing
private List<TsBlock> executeGroupByQueryInternal(
SessionInfo sessionInfo,
@@ -677,21 +675,14 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
Filter timeFilter = TimeFilterApi.between(startTime, endTime - 1);
- QueryId queryId = new QueryId("stub_query");
- FragmentInstanceId instanceId =
- new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
- FragmentInstanceStateMachine stateMachine =
- new FragmentInstanceStateMachine(
- instanceId,
FragmentInstanceManager.getInstance().instanceNotificationExecutor);
- FragmentInstanceContext fragmentInstanceContext =
- createFragmentInstanceContext(
- instanceId, stateMachine, sessionInfo, dataRegionList.get(0),
timeFilter);
+ FakedFragmentInstanceContext fragmentInstanceContext =
+ new FakedFragmentInstanceContext(timeFilter, dataRegionList.get(0));
+
DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
PlanNodeId planNodeId = new PlanNodeId("1");
- driverContext.addOperatorContext(1, planNodeId,
SeriesScanOperator.class.getSimpleName());
- driverContext
- .getOperatorContexts()
- .forEach(operatorContext ->
operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE));
+ OperatorContext operatorContext =
+ new OperatorContext(1, planNodeId, "SeriesAggregationScanOperator",
driverContext);
+ operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE);
SeriesScanOptions.Builder scanOptionsBuilder = new
SeriesScanOptions.Builder();
scanOptionsBuilder.withAllSensors(Collections.singleton(measurement));
@@ -709,7 +700,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
true,
true),
AggregationStep.SINGLE,
- Collections.singletonList(new InputLocation[] {new
InputLocation(0, 0)}));
+ inputLocationList);
GroupByTimeParameter groupByTimeParameter =
new GroupByTimeParameter(
@@ -717,11 +708,15 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
IMeasurementSchema measurementSchema = new MeasurementSchema(measurement,
dataType);
AbstractSeriesAggregationScanOperator operator;
+ boolean canUseStatistics =
+ !TSDataType.BLOB.equals(dataType)
+ || (!TAggregationType.LAST_VALUE.equals(aggregationType)
+ && !TAggregationType.FIRST_VALUE.equals(aggregationType));
PartialPath path;
if (isAligned) {
path =
new AlignedPath(
- device,
+ device.split("\\."),
Collections.singletonList(measurement),
Collections.singletonList(measurementSchema));
operator =
@@ -730,36 +725,36 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
(AlignedPath) path,
Ordering.ASC,
scanOptionsBuilder.build(),
- driverContext.getOperatorContexts().get(0),
+ operatorContext,
Collections.singletonList(aggregator),
initTimeRangeIterator(groupByTimeParameter, true, true,
sessionInfo.getZoneId()),
groupByTimeParameter,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
- !TSDataType.BLOB.equals(dataType)
- || (!TAggregationType.LAST_VALUE.equals(aggregationType)
- &&
!TAggregationType.FIRST_VALUE.equals(aggregationType)));
+ canUseStatistics);
} else {
- path = new MeasurementPath(device, measurement, measurementSchema);
+ String[] splits = device.split("\\.");
+ String[] fullPaths = new String[splits.length + 1];
+ System.arraycopy(splits, 0, fullPaths, 0, splits.length);
+ fullPaths[splits.length] = measurement;
+ path = new MeasurementPath(fullPaths, measurementSchema);
operator =
new SeriesAggregationScanOperator(
planNodeId,
path,
Ordering.ASC,
scanOptionsBuilder.build(),
- driverContext.getOperatorContexts().get(0),
+ operatorContext,
Collections.singletonList(aggregator),
initTimeRangeIterator(groupByTimeParameter, true, true,
sessionInfo.getZoneId()),
groupByTimeParameter,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
- !TSDataType.BLOB.equals(dataType)
- || (!TAggregationType.LAST_VALUE.equals(aggregationType)
- &&
!TAggregationType.FIRST_VALUE.equals(aggregationType)));
+ canUseStatistics);
}
try {
List<TsBlock> result = new ArrayList<>();
- fragmentInstanceContext.setSourcePaths(Collections.singletonList(path));
-
operator.initQueryDataSource(fragmentInstanceContext.getSharedQueryDataSource());
+ QueryDataSource dataSource =
fragmentInstanceContext.getSharedQueryDataSource(path);
+ operator.initQueryDataSource(dataSource);
while (operator.hasNext()) {
result.add(operator.next());
@@ -769,7 +764,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
- fragmentInstanceContext.releaseResource();
+ fragmentInstanceContext.releaseSharedQueryDataSource();
}
}
@@ -1075,7 +1070,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
deviceId,
measurementId,
dataType,
- true,
+ req.isAligned,
req.getStartTime(),
req.getEndTime(),
req.getInterval(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java
index 6992d80b53f..fb4fd904036 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java
@@ -64,29 +64,22 @@ public class Aggregator {
// Used for SeriesAggregateScanOperator and RawDataAggregateOperator
public void processTsBlock(TsBlock tsBlock, BitMap bitMap) {
- long startTime = System.nanoTime();
- try {
- checkArgument(
- step.isInputRaw(),
- "Step in SeriesAggregateScanOperator and RawDataAggregateOperator
can only process raw input");
- for (InputLocation[] inputLocations : inputLocationList) {
- Column[] timeAndValueColumn = new Column[1 + inputLocations.length];
- timeAndValueColumn[0] = tsBlock.getTimeColumn();
- for (int i = 0; i < inputLocations.length; i++) {
- checkArgument(
- inputLocations[i].getTsBlockIndex() == 0,
- "RawDataAggregateOperator can only process one tsBlock input.");
- int index = inputLocations[i].getValueColumnIndex();
- // for count_time, time column is also its value column
- // for max_by, the input column can also be time column.
- timeAndValueColumn[1 + i] =
- index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index);
- }
- accumulator.addInput(timeAndValueColumn, bitMap);
+ checkArgument(
+ step.isInputRaw(),
+ "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can
only process raw input");
+ for (InputLocation[] inputLocations : inputLocationList) {
+ Column[] timeAndValueColumn = new Column[1 + inputLocations.length];
+ timeAndValueColumn[0] = tsBlock.getTimeColumn();
+ for (int i = 0; i < inputLocations.length; i++) {
+ checkArgument(
+ inputLocations[i].getTsBlockIndex() == 0,
+ "RawDataAggregateOperator can only process one tsBlock input.");
+ int index = inputLocations[i].getValueColumnIndex();
+ // for count_time, time column is also its value column
+ // for max_by, the input column can also be time column.
+ timeAndValueColumn[1 + i] = index == -1 ? timeAndValueColumn[0] :
tsBlock.getColumn(index);
}
- } finally {
- QUERY_EXECUTION_METRICS.recordExecutionCost(
- AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
+ accumulator.addInput(timeAndValueColumn, bitMap);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
index 79cb4acf3b9..b16de9d633b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
@@ -50,6 +50,12 @@ public class DriverContext {
this.fragmentInstanceContext = null;
}
+ @TestOnly
+ // should only be used by executeGroupByQueryInternal
+ public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
+ this.fragmentInstanceContext = fragmentInstanceContext;
+ }
+
public DriverContext(FragmentInstanceContext fragmentInstanceContext, int
pipelineId) {
this.fragmentInstanceContext = fragmentInstanceContext;
this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(),
pipelineId);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
new file mode 100644
index 00000000000..a91e41d2379
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.fragment;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import
org.apache.iotdb.db.queryengine.plan.planner.memory.FakedMemoryReservationManager;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.read.filter.basic.Filter;
+
+import java.util.Collections;
+import java.util.List;
+
+public class FakedFragmentInstanceContext extends FragmentInstanceContext {
+
+ public FakedFragmentInstanceContext(Filter timeFilter, DataRegion
dataRegion) {
+ super(0, new FakedMemoryReservationManager(), timeFilter, dataRegion);
+ }
+
+ public QueryDataSource getSharedQueryDataSource(PartialPath sourcePath)
+ throws QueryProcessException {
+ if (sharedQueryDataSource == null) {
+ initQueryDataSource(sourcePath);
+ }
+ return (QueryDataSource) sharedQueryDataSource;
+ }
+
+ public void initQueryDataSource(PartialPath sourcePath) throws
QueryProcessException {
+
+ dataRegion.readLock();
+ try {
+ this.sharedQueryDataSource =
+ dataRegion.query(
+ Collections.singletonList(sourcePath),
+ sourcePath.getDevice(),
+ this,
+ getGlobalTimeFilter(),
+ null);
+
+ // used files should be added before mergeLock is unlocked, or they may
be deleted by
+ // running merge
+ if (sharedQueryDataSource != null) {
+ ((QueryDataSource) sharedQueryDataSource).setSingleDevice(true);
+ List<TsFileResource> tsFileList =
+ ((QueryDataSource) sharedQueryDataSource).getSeqResources();
+ if (tsFileList != null) {
+ for (TsFileResource tsFile : tsFileList) {
+
FileReaderManager.getInstance().increaseFileReaderReference(tsFile,
tsFile.isClosed());
+ }
+ }
+ tsFileList = ((QueryDataSource)
sharedQueryDataSource).getUnseqResources();
+ if (tsFileList != null) {
+ for (TsFileResource tsFile : tsFileList) {
+
FileReaderManager.getInstance().increaseFileReaderReference(tsFile,
tsFile.isClosed());
+ }
+ }
+ }
+ } finally {
+ dataRegion.readUnlock();
+ }
+ }
+
+ public void releaseSharedQueryDataSource() {
+ if (sharedQueryDataSource != null) {
+ List<TsFileResource> tsFileList = ((QueryDataSource)
sharedQueryDataSource).getSeqResources();
+ if (tsFileList != null) {
+ for (TsFileResource tsFile : tsFileList) {
+ FileReaderManager.getInstance().decreaseFileReaderReference(tsFile,
tsFile.isClosed());
+ }
+ }
+ tsFileList = ((QueryDataSource)
sharedQueryDataSource).getUnseqResources();
+ if (tsFileList != null) {
+ for (TsFileResource tsFile : tsFileList) {
+ FileReaderManager.getInstance().decreaseFileReaderReference(tsFile,
tsFile.isClosed());
+ }
+ }
+ sharedQueryDataSource = null;
+ }
+ }
+
+ @Override
+ protected boolean checkIfModificationExists(TsFileResource tsFileResource) {
+ return false;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 7e2c144b0fb..ae63887e645 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
import
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import
org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
@@ -72,16 +73,16 @@ public class FragmentInstanceContext extends QueryContext {
private final MemoryReservationManager memoryReservationManager;
- private IDataRegionForQuery dataRegion;
+ protected IDataRegionForQuery dataRegion;
private Filter globalTimeFilter;
// it will only be used once, after sharedQueryDataSource being inited, it
will be set to null
- private List<PartialPath> sourcePaths;
+ protected List<PartialPath> sourcePaths;
// Used for region scan.
private Map<IDeviceID, DeviceContext> devicePathsToContext;
// Shared by all scan operators in this fragment instance to avoid memory
problem
- private IQueryDataSource sharedQueryDataSource;
+ protected IQueryDataSource sharedQueryDataSource;
/** closed tsfile used in this fragment instance. */
private Set<TsFileResource> closedFilePaths;
@@ -168,7 +169,7 @@ public class FragmentInstanceContext extends QueryContext {
}
public static FragmentInstanceContext
createFragmentInstanceContextForCompaction(long queryId) {
- return new FragmentInstanceContext(queryId);
+ return new FragmentInstanceContext(queryId, null, null, null);
}
public void setQueryDataSourceType(QueryDataSourceType queryDataSourceType) {
@@ -271,13 +272,19 @@ public class FragmentInstanceContext extends QueryContext
{
}
// used for compaction
- private FragmentInstanceContext(long queryId) {
+ protected FragmentInstanceContext(
+ long queryId,
+ MemoryReservationManager memoryReservationManager,
+ Filter timeFilter,
+ DataRegion dataRegion) {
this.queryId = queryId;
this.id = null;
this.stateMachine = null;
this.dataNodeQueryContextMap = null;
this.dataNodeQueryContext = null;
- this.memoryReservationManager = null;
+ this.dataRegion = dataRegion;
+ this.globalTimeFilter = timeFilter;
+ this.memoryReservationManager = memoryReservationManager;
}
public void start() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
index c8352d5f5e5..f0a210e1edf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
@@ -83,7 +83,7 @@ public class QueryContext {
this.timeout = timeout;
}
- private boolean checkIfModificationExists(TsFileResource tsFileResource) {
+ protected boolean checkIfModificationExists(TsFileResource tsFileResource) {
if (nonExistentModFiles.contains(tsFileResource.getTsFileID())) {
return false;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTaskId.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTaskId.java
index 865af681b37..3dff2044a6e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTaskId.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTaskId.java
@@ -36,11 +36,12 @@ public class DriverTaskId implements ID,
Comparable<DriverTaskId> {
// Currently, we just save pipelineId in driverTask since it's one-to-one
relation.
private final int pipelineId;
private final String fullId;
+ private static final String EMPTY_FULL_ID = "EmptyFullId";
public DriverTaskId(FragmentInstanceId id, int pipelineId) {
this.fragmentInstanceId = id;
this.pipelineId = pipelineId;
- this.fullId = String.format("%s.%d", id.getFullId(), pipelineId);
+ this.fullId = String.format("%s.%d", id == null ? EMPTY_FULL_ID :
id.getFullId(), pipelineId);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
new file mode 100644
index 00000000000..265ca47ca23
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+public class FakedMemoryReservationManager implements MemoryReservationManager
{
+
+ @Override
+ public void reserveMemoryCumulatively(long size) {}
+
+ @Override
+ public void reserveMemoryImmediately() {}
+
+ @Override
+ public void releaseMemoryCumulatively(long size) {}
+
+ @Override
+ public void releaseAllReservedMemory() {}
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResourceList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResourceList.java
index 35cedd5d1c0..a1d7b3dafbf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResourceList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResourceList.java
@@ -354,7 +354,7 @@ public class TsFileResourceList implements
List<TsFileResource> {
}
public List<TsFileResource> getArrayList() {
- List<TsFileResource> list = new ArrayList<>();
+ List<TsFileResource> list = new ArrayList<>(count);
TsFileResource current = header;
while (current != null) {
list.add(current);
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index 917a9a57900..0fa1a9a6ecb 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -378,6 +378,7 @@ struct TSGroupByQueryIntervalReq {
10: optional i64 interval
11: optional i32 fetchSize
12: optional i64 timeout
+ 13: optional bool isAligned
}
struct TSCreateMultiTimeseriesReq {