This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch OptGroupByQuery
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ec270bc23c8b4e4eada8e10c78a76e0b5ca680fa
Author: JackieTien97 <[email protected]>
AuthorDate: Fri Mar 21 11:23:28 2025 +0800

    Opt executeGroupByQueryInternal
---
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |  42 +++------
 .../execution/driver/DriverContext.java            |   6 ++
 .../fragment/FakedFragmentInstanceContext.java     | 101 +++++++++++++++++++++
 .../fragment/FragmentInstanceContext.java          |  19 ++--
 .../memory/FakedMemoryReservationManager.java      |  35 +++++++
 .../dataregion/tsfile/TsFileResourceList.java      |   2 +-
 6 files changed, 171 insertions(+), 34 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 4c2c4a4e654..e66e626c109 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -53,9 +53,6 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
 import org.apache.iotdb.db.protocol.session.IClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.protocol.thrift.OperationType;
-import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
-import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
-import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
 import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
@@ -63,14 +60,12 @@ import 
org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
 import 
org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.queryengine.execution.aggregation.Aggregator;
 import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
-import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
-import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
-import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FakedFragmentInstanceContext;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesAggregationScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesAggregationScanOperator;
-import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
@@ -109,6 +104,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.UnsetSch
 import org.apache.iotdb.db.schemaengine.template.TemplateQueryType;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
 import 
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager;
 import org.apache.iotdb.db.storageengine.rescon.quotas.OperationQuota;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
@@ -205,7 +201,6 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
 import static 
org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
-import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest;
 import static 
org.apache.iotdb.db.utils.CommonUtils.getContentOfTSFastLastDataQueryForOneDeviceReq;
@@ -244,7 +239,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
   private final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE = 
DataNodeSchemaCache.getInstance();
 
-  public static Duration DEFAULT_TIME_SLICE = new Duration(60_000, 
TimeUnit.MILLISECONDS);
+  public static final Duration DEFAULT_TIME_SLICE = new Duration(60_000, 
TimeUnit.MILLISECONDS);
 
   private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
       TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
@@ -677,21 +672,14 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
     Filter timeFilter = TimeFilterApi.between(startTime, endTime - 1);
 
-    QueryId queryId = new QueryId("stub_query");
-    FragmentInstanceId instanceId =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
-    FragmentInstanceStateMachine stateMachine =
-        new FragmentInstanceStateMachine(
-            instanceId, 
FragmentInstanceManager.getInstance().instanceNotificationExecutor);
-    FragmentInstanceContext fragmentInstanceContext =
-        createFragmentInstanceContext(
-            instanceId, stateMachine, sessionInfo, dataRegionList.get(0), 
timeFilter);
+    FakedFragmentInstanceContext fragmentInstanceContext =
+        new FakedFragmentInstanceContext(timeFilter, dataRegionList.get(0));
+
     DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
     PlanNodeId planNodeId = new PlanNodeId("1");
-    driverContext.addOperatorContext(1, planNodeId, 
SeriesScanOperator.class.getSimpleName());
-    driverContext
-        .getOperatorContexts()
-        .forEach(operatorContext -> 
operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE));
+    OperatorContext operatorContext =
+        new OperatorContext(1, planNodeId, "SeriesAggregationScanOperator", 
driverContext);
+    operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE);
 
     SeriesScanOptions.Builder scanOptionsBuilder = new 
SeriesScanOptions.Builder();
     scanOptionsBuilder.withAllSensors(Collections.singleton(measurement));
@@ -730,7 +718,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
               (AlignedPath) path,
               Ordering.ASC,
               scanOptionsBuilder.build(),
-              driverContext.getOperatorContexts().get(0),
+              operatorContext,
               Collections.singletonList(aggregator),
               initTimeRangeIterator(groupByTimeParameter, true, true, 
sessionInfo.getZoneId()),
               groupByTimeParameter,
@@ -746,7 +734,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
               path,
               Ordering.ASC,
               scanOptionsBuilder.build(),
-              driverContext.getOperatorContexts().get(0),
+              operatorContext,
               Collections.singletonList(aggregator),
               initTimeRangeIterator(groupByTimeParameter, true, true, 
sessionInfo.getZoneId()),
               groupByTimeParameter,
@@ -758,8 +746,8 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
     try {
       List<TsBlock> result = new ArrayList<>();
-      fragmentInstanceContext.setSourcePaths(Collections.singletonList(path));
-      
operator.initQueryDataSource(fragmentInstanceContext.getSharedQueryDataSource());
+      QueryDataSource dataSource = 
fragmentInstanceContext.getSharedQueryDataSource(path);
+      operator.initQueryDataSource(dataSource);
 
       while (operator.hasNext()) {
         result.add(operator.next());
@@ -769,7 +757,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     } catch (Exception e) {
       throw new RuntimeException(e);
     } finally {
-      fragmentInstanceContext.releaseResource();
+      fragmentInstanceContext.releaseSharedQueryDataSource();
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
index 79cb4acf3b9..b16de9d633b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
@@ -50,6 +50,12 @@ public class DriverContext {
     this.fragmentInstanceContext = null;
   }
 
+  @TestOnly
+  // should only be used by executeGroupByQueryInternal
+  public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
+    this.fragmentInstanceContext = fragmentInstanceContext;
+  }
+
   public DriverContext(FragmentInstanceContext fragmentInstanceContext, int 
pipelineId) {
     this.fragmentInstanceContext = fragmentInstanceContext;
     this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(), 
pipelineId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
new file mode 100644
index 00000000000..b814fe96885
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.fragment;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.FakedMemoryReservationManager;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.read.filter.basic.Filter;
+
+import java.util.Collections;
+import java.util.List;
+
+public class FakedFragmentInstanceContext extends FragmentInstanceContext {
+
+  public FakedFragmentInstanceContext(Filter timeFilter, DataRegion 
dataRegion) {
+    super(0, new FakedMemoryReservationManager(), timeFilter, dataRegion);
+  }
+
+  public QueryDataSource getSharedQueryDataSource(PartialPath sourcePath)
+      throws QueryProcessException {
+    if (sharedQueryDataSource == null) {
+      initQueryDataSource(sourcePath);
+    }
+    return (QueryDataSource) sharedQueryDataSource;
+  }
+
+  public void initQueryDataSource(PartialPath sourcePath) throws 
QueryProcessException {
+
+    dataRegion.readLock();
+    try {
+      this.sharedQueryDataSource =
+          dataRegion.query(
+              Collections.singletonList(sourcePath),
+              sourcePath.getDevice(),
+              this,
+              getGlobalTimeFilter(),
+              null);
+
+      // used files should be added before mergeLock is unlocked, or they may 
be deleted by
+      // running merge
+      if (sharedQueryDataSource != null) {
+        ((QueryDataSource) sharedQueryDataSource).setSingleDevice(true);
+        List<TsFileResource> tsFileList =
+            ((QueryDataSource) sharedQueryDataSource).getSeqResources();
+        if (tsFileList != null) {
+          for (TsFileResource tsFile : tsFileList) {
+            
FileReaderManager.getInstance().increaseFileReaderReference(tsFile, 
tsFile.isClosed());
+          }
+        }
+        tsFileList = ((QueryDataSource) 
sharedQueryDataSource).getUnseqResources();
+        if (tsFileList != null) {
+          for (TsFileResource tsFile : tsFileList) {
+            
FileReaderManager.getInstance().increaseFileReaderReference(tsFile, 
tsFile.isClosed());
+          }
+        }
+      }
+    } finally {
+      dataRegion.readUnlock();
+    }
+  }
+
+  public void releaseSharedQueryDataSource() {
+    if (sharedQueryDataSource != null) {
+      List<TsFileResource> tsFileList = ((QueryDataSource) 
sharedQueryDataSource).getSeqResources();
+      if (tsFileList != null) {
+        for (TsFileResource tsFile : tsFileList) {
+          FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, 
tsFile.isClosed());
+        }
+      }
+      tsFileList = ((QueryDataSource) 
sharedQueryDataSource).getUnseqResources();
+      if (tsFileList != null) {
+        for (TsFileResource tsFile : tsFileList) {
+          FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, 
tsFile.isClosed());
+        }
+      }
+      sharedQueryDataSource = null;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 58f06bfb270..10f922d764b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
 import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
 import 
org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
 import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
 import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
@@ -72,16 +73,16 @@ public class FragmentInstanceContext extends QueryContext {
 
   private final MemoryReservationManager memoryReservationManager;
 
-  private IDataRegionForQuery dataRegion;
+  protected IDataRegionForQuery dataRegion;
   private Filter globalTimeFilter;
 
   // it will only be used once, after sharedQueryDataSource being inited, it 
will be set to null
-  private List<PartialPath> sourcePaths;
+  protected List<PartialPath> sourcePaths;
   // Used for region scan.
   private Map<IDeviceID, DeviceContext> devicePathsToContext;
 
   // Shared by all scan operators in this fragment instance to avoid memory 
problem
-  private IQueryDataSource sharedQueryDataSource;
+  protected IQueryDataSource sharedQueryDataSource;
 
   /** closed tsfile used in this fragment instance. */
   private Set<TsFileResource> closedFilePaths;
@@ -168,7 +169,7 @@ public class FragmentInstanceContext extends QueryContext {
   }
 
   public static FragmentInstanceContext 
createFragmentInstanceContextForCompaction(long queryId) {
-    return new FragmentInstanceContext(queryId);
+    return new FragmentInstanceContext(queryId, null, null, null);
   }
 
   public void setQueryDataSourceType(QueryDataSourceType queryDataSourceType) {
@@ -241,13 +242,19 @@ public class FragmentInstanceContext extends QueryContext 
{
   }
 
   // used for compaction
-  private FragmentInstanceContext(long queryId) {
+  protected FragmentInstanceContext(
+      long queryId,
+      MemoryReservationManager memoryReservationManager,
+      Filter timeFilter,
+      DataRegion dataRegion) {
     this.queryId = queryId;
     this.id = null;
     this.stateMachine = null;
     this.dataNodeQueryContextMap = null;
     this.dataNodeQueryContext = null;
-    this.memoryReservationManager = null;
+    this.dataRegion = dataRegion;
+    this.globalTimeFilter = timeFilter;
+    this.memoryReservationManager = memoryReservationManager;
   }
 
   public void start() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
new file mode 100644
index 00000000000..265ca47ca23
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+public class FakedMemoryReservationManager implements MemoryReservationManager 
{
+
+  @Override
+  public void reserveMemoryCumulatively(long size) {}
+
+  @Override
+  public void reserveMemoryImmediately() {}
+
+  @Override
+  public void releaseMemoryCumulatively(long size) {}
+
+  @Override
+  public void releaseAllReservedMemory() {}
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResourceList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResourceList.java
index 35cedd5d1c0..a1d7b3dafbf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResourceList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResourceList.java
@@ -354,7 +354,7 @@ public class TsFileResourceList implements 
List<TsFileResource> {
   }
 
   public List<TsFileResource> getArrayList() {
-    List<TsFileResource> list = new ArrayList<>();
+    List<TsFileResource> list = new ArrayList<>(count);
     TsFileResource current = header;
     while (current != null) {
       list.add(current);

Reply via email to