This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch OptGroupByQuery in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ec270bc23c8b4e4eada8e10c78a76e0b5ca680fa Author: JackieTien97 <[email protected]> AuthorDate: Fri Mar 21 11:23:28 2025 +0800 Opt executeGroupByQueryInternal --- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 42 +++------ .../execution/driver/DriverContext.java | 6 ++ .../fragment/FakedFragmentInstanceContext.java | 101 +++++++++++++++++++++ .../fragment/FragmentInstanceContext.java | 19 ++-- .../memory/FakedMemoryReservationManager.java | 35 +++++++ .../dataregion/tsfile/TsFileResourceList.java | 2 +- 6 files changed, 171 insertions(+), 34 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 4c2c4a4e654..e66e626c109 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -53,9 +53,6 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.protocol.thrift.OperationType; -import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; -import org.apache.iotdb.db.queryengine.common.PlanFragmentId; -import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; @@ -63,14 +60,12 @@ import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory; import org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory; import org.apache.iotdb.db.queryengine.execution.aggregation.Aggregator; import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; -import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; -import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager; -import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; +import org.apache.iotdb.db.queryengine.execution.fragment.FakedFragmentInstanceContext; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil; import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesAggregationScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesAggregationScanOperator; -import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; @@ -109,6 +104,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.UnsetSch import org.apache.iotdb.db.schemaengine.template.TemplateQueryType; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager; import org.apache.iotdb.db.storageengine.rescon.quotas.OperationQuota; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; @@ -205,7 +201,6 @@ import java.util.concurrent.TimeUnit; import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED; import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode; -import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest; import static org.apache.iotdb.db.utils.CommonUtils.getContentOfTSFastLastDataQueryForOneDeviceReq; @@ -244,7 +239,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE = DataNodeSchemaCache.getInstance(); - public static Duration DEFAULT_TIME_SLICE = new Duration(60_000, TimeUnit.MILLISECONDS); + public static final Duration DEFAULT_TIME_SLICE = new Duration(60_000, TimeUnit.MILLISECONDS); private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); @@ -677,21 +672,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { Filter timeFilter = TimeFilterApi.between(startTime, endTime - 1); - QueryId queryId = new QueryId("stub_query"); - FragmentInstanceId instanceId = - new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); - FragmentInstanceStateMachine stateMachine = - new FragmentInstanceStateMachine( - instanceId, FragmentInstanceManager.getInstance().instanceNotificationExecutor); - FragmentInstanceContext fragmentInstanceContext = - createFragmentInstanceContext( - instanceId, stateMachine, sessionInfo, dataRegionList.get(0), timeFilter); + FakedFragmentInstanceContext fragmentInstanceContext = + new FakedFragmentInstanceContext(timeFilter, dataRegionList.get(0)); + DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); PlanNodeId planNodeId = new PlanNodeId("1"); - driverContext.addOperatorContext(1, planNodeId, SeriesScanOperator.class.getSimpleName()); - driverContext - .getOperatorContexts() - .forEach(operatorContext -> operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE)); + OperatorContext operatorContext = + new OperatorContext(1, planNodeId, "SeriesAggregationScanOperator", driverContext); + operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE); SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); scanOptionsBuilder.withAllSensors(Collections.singleton(measurement)); @@ -730,7 +718,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { (AlignedPath) path, Ordering.ASC, scanOptionsBuilder.build(), - driverContext.getOperatorContexts().get(0), + operatorContext, Collections.singletonList(aggregator), initTimeRangeIterator(groupByTimeParameter, true, true, sessionInfo.getZoneId()), groupByTimeParameter, @@ -746,7 +734,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { path, Ordering.ASC, scanOptionsBuilder.build(), - driverContext.getOperatorContexts().get(0), + operatorContext, Collections.singletonList(aggregator), initTimeRangeIterator(groupByTimeParameter, true, true, sessionInfo.getZoneId()), groupByTimeParameter, @@ -758,8 +746,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { try { List<TsBlock> result = new ArrayList<>(); - fragmentInstanceContext.setSourcePaths(Collections.singletonList(path)); - operator.initQueryDataSource(fragmentInstanceContext.getSharedQueryDataSource()); + QueryDataSource dataSource = fragmentInstanceContext.getSharedQueryDataSource(path); + operator.initQueryDataSource(dataSource); while (operator.hasNext()) { result.add(operator.next()); @@ -769,7 +757,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } catch (Exception e) { throw new RuntimeException(e); } finally { - fragmentInstanceContext.releaseResource(); + fragmentInstanceContext.releaseSharedQueryDataSource(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java index 79cb4acf3b9..b16de9d633b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java @@ -50,6 +50,12 @@ public class DriverContext { this.fragmentInstanceContext = null; } + @TestOnly + // should only be used by executeGroupByQueryInternal + public DriverContext(FragmentInstanceContext fragmentInstanceContext) { + this.fragmentInstanceContext = fragmentInstanceContext; + } + public DriverContext(FragmentInstanceContext fragmentInstanceContext, int pipelineId) { this.fragmentInstanceContext = fragmentInstanceContext; this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(), pipelineId); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java new file mode 100644 index 00000000000..b814fe96885 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.fragment; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.queryengine.plan.planner.memory.FakedMemoryReservationManager; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.apache.tsfile.read.filter.basic.Filter; + +import java.util.Collections; +import java.util.List; + +public class FakedFragmentInstanceContext extends FragmentInstanceContext { + + public FakedFragmentInstanceContext(Filter timeFilter, DataRegion dataRegion) { + super(0, new FakedMemoryReservationManager(), timeFilter, dataRegion); + } + + public QueryDataSource getSharedQueryDataSource(PartialPath sourcePath) + throws QueryProcessException { + if (sharedQueryDataSource == null) { + initQueryDataSource(sourcePath); + } + return (QueryDataSource) sharedQueryDataSource; + } + + public void initQueryDataSource(PartialPath sourcePath) throws QueryProcessException { + + dataRegion.readLock(); + try { + this.sharedQueryDataSource = + dataRegion.query( + Collections.singletonList(sourcePath), + sourcePath.getDevice(), + this, + getGlobalTimeFilter(), + null); + + // used files should be added before mergeLock is unlocked, or they may be deleted by + // running merge + if (sharedQueryDataSource != null) { + ((QueryDataSource) sharedQueryDataSource).setSingleDevice(true); + List<TsFileResource> tsFileList = + ((QueryDataSource) sharedQueryDataSource).getSeqResources(); + if (tsFileList != null) { + for (TsFileResource tsFile : tsFileList) { + FileReaderManager.getInstance().increaseFileReaderReference(tsFile, tsFile.isClosed()); + } + } + tsFileList = ((QueryDataSource) sharedQueryDataSource).getUnseqResources(); + if (tsFileList != null) { + for (TsFileResource tsFile : tsFileList) { + FileReaderManager.getInstance().increaseFileReaderReference(tsFile, tsFile.isClosed()); + } + } + } + } finally { + dataRegion.readUnlock(); + } + } + + public void releaseSharedQueryDataSource() { + if (sharedQueryDataSource != null) { + List<TsFileResource> tsFileList = ((QueryDataSource) sharedQueryDataSource).getSeqResources(); + if (tsFileList != null) { + for (TsFileResource tsFile : tsFileList) { + FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, tsFile.isClosed()); + } + } + tsFileList = ((QueryDataSource) sharedQueryDataSource).getUnseqResources(); + if (tsFileList != null) { + for (TsFileResource tsFile : tsFileList) { + FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, tsFile.isClosed()); + } + } + sharedQueryDataSource = null; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 58f06bfb270..10f922d764b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; import org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery; import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; @@ -72,16 +73,16 @@ public class FragmentInstanceContext extends QueryContext { private final MemoryReservationManager memoryReservationManager; - private IDataRegionForQuery dataRegion; + protected IDataRegionForQuery dataRegion; private Filter globalTimeFilter; // it will only be used once, after sharedQueryDataSource being inited, it will be set to null - private List<PartialPath> sourcePaths; + protected List<PartialPath> sourcePaths; // Used for region scan. private Map<IDeviceID, DeviceContext> devicePathsToContext; // Shared by all scan operators in this fragment instance to avoid memory problem - private IQueryDataSource sharedQueryDataSource; + protected IQueryDataSource sharedQueryDataSource; /** closed tsfile used in this fragment instance. */ private Set<TsFileResource> closedFilePaths; @@ -168,7 +169,7 @@ public class FragmentInstanceContext extends QueryContext { } public static FragmentInstanceContext createFragmentInstanceContextForCompaction(long queryId) { - return new FragmentInstanceContext(queryId); + return new FragmentInstanceContext(queryId, null, null, null); } public void setQueryDataSourceType(QueryDataSourceType queryDataSourceType) { @@ -241,13 +242,19 @@ public class FragmentInstanceContext extends QueryContext { } // used for compaction - private FragmentInstanceContext(long queryId) { + protected FragmentInstanceContext( + long queryId, + MemoryReservationManager memoryReservationManager, + Filter timeFilter, + DataRegion dataRegion) { this.queryId = queryId; this.id = null; this.stateMachine = null; this.dataNodeQueryContextMap = null; this.dataNodeQueryContext = null; - this.memoryReservationManager = null; + this.dataRegion = dataRegion; + this.globalTimeFilter = timeFilter; + this.memoryReservationManager = memoryReservationManager; } public void start() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java new file mode 100644 index 00000000000..265ca47ca23 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.memory; + +public class FakedMemoryReservationManager implements MemoryReservationManager { + + @Override + public void reserveMemoryCumulatively(long size) {} + + @Override + public void reserveMemoryImmediately() {} + + @Override + public void releaseMemoryCumulatively(long size) {} + + @Override + public void releaseAllReservedMemory() {} +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResourceList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResourceList.java index 35cedd5d1c0..a1d7b3dafbf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResourceList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResourceList.java @@ -354,7 +354,7 @@ public class TsFileResourceList implements List<TsFileResource> { } public List<TsFileResource> getArrayList() { - List<TsFileResource> list = new ArrayList<>(); + List<TsFileResource> list = new ArrayList<>(count); TsFileResource current = header; while (current != null) { list.add(current);
