This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch benchants_branch
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/benchants_branch by this push:
     new fa90736430d Add SessionInfo as parameter
fa90736430d is described below

commit fa90736430d078dc13eb65f0c0e8b4b6b9ac78d6
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Jun 7 11:17:15 2023 +0800

    Add SessionInfo as parameter
---
 .../fragment/FragmentInstanceContext.java          |  2 +-
 .../fragment/FragmentInstanceManager.java          |  2 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 58 +++++++++++++++++++---
 3 files changed, 52 insertions(+), 10 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 6434e8faa06..37120a4c102 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -381,7 +381,7 @@ public class FragmentInstanceContext extends QueryContext {
    * All file paths used by this fragment instance must be cleared and thus 
the usage reference must
    * be decreased.
    */
-  protected synchronized void releaseResource() {
+  public synchronized void releaseResource() {
     // For schema related query FI, closedFilePaths and unClosedFilePaths will 
be null
     if (closedFilePaths != null) {
       for (TsFileResource tsFile : closedFilePaths) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 74e2985207d..c5ccd63f2d1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -64,7 +64,7 @@ public class FragmentInstanceManager {
   private final IDriverScheduler scheduler = DriverScheduler.getInstance();
 
   private final ScheduledExecutorService instanceManagementExecutor;
-  private final ExecutorService instanceNotificationExecutor;
+  public final ExecutorService instanceNotificationExecutor;
 
   private final Duration infoCacheTime;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index bccd8f24a14..67db441a311 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -34,15 +34,18 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.OperationType;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.metadata.template.TemplateQueryType;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import 
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
@@ -56,6 +59,9 @@ import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.parser.ASTVisitor;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.StatementType;
@@ -140,6 +146,7 @@ import 
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -585,6 +592,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
   // 2. call executeGroupByQueryInternal 拿到List<TsBlock>
   // 3. 组装结果集返回给客户端,模仿QueryDataSetUtils.convertTsBlockByFetchSize
   private List<TsBlock> executeGroupByQueryInternal(
+      SessionInfo sessionInfo,
       String device,
       String measurement,
       TSDataType dataType,
@@ -594,6 +602,13 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       TAggregationType aggregationType,
       List<DataRegion> dataRegionList)
       throws IllegalPathException {
+
+    int dataRegionSize = dataRegionList.size();
+    if (dataRegionSize != 1) {
+      throw new IllegalArgumentException(
+          "dataRegionList.size() should only be 1 now,  current size is " + 
dataRegionSize);
+    }
+
     IMeasurementSchema measurementSchema = new MeasurementSchema(measurement, 
dataType);
     AlignedPath alignedPath =
         new AlignedPath(
@@ -601,13 +616,17 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
             Collections.singletonList(measurement),
             Collections.singletonList(measurementSchema));
 
+    Filter timeFilter = new TimeFilter.TimeGtEqAndLt(startTime, endTme);
+
     QueryId queryId = new QueryId("stub_query");
     FragmentInstanceId instanceId =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
     FragmentInstanceStateMachine stateMachine =
-        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+        new FragmentInstanceStateMachine(
+            instanceId, 
FragmentInstanceManager.getInstance().instanceNotificationExecutor);
     FragmentInstanceContext fragmentInstanceContext =
-        createFragmentInstanceContext(instanceId, stateMachine);
+        createFragmentInstanceContext(
+            instanceId, stateMachine, sessionInfo, dataRegionList.get(0), 
timeFilter);
     DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
     PlanNodeId planNodeId = new PlanNodeId("1");
     driverContext.addOperatorContext(1, planNodeId, 
SeriesScanOperator.class.getSimpleName());
@@ -617,7 +636,16 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
     SeriesScanOptions.Builder scanOptionsBuilder = new 
SeriesScanOptions.Builder();
     scanOptionsBuilder.withAllSensors(Collections.singleton(measurement));
-    scanOptionsBuilder.withGlobalTimeFilter(new 
TimeFilter.TimeGtEqAndLt(startTime, endTme));
+    scanOptionsBuilder.withGlobalTimeFilter(timeFilter);
+
+    Aggregator aggregator =
+        new Aggregator(
+            AccumulatorFactory.createAccumulator(aggregationType, dataType, 
null, null, true),
+            AggregationStep.SINGLE,
+            Collections.singletonList(new InputLocation[] {new 
InputLocation(0, 0)}));
+
+    GroupByTimeParameter groupByTimeParameter =
+        new GroupByTimeParameter(startTime, endTme, interval, interval, true);
 
     AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
         new AlignedSeriesAggregationScanOperator(
@@ -626,12 +654,26 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
             Ordering.ASC,
             scanOptionsBuilder.build(),
             driverContext.getOperatorContexts().get(0),
-            aggregators,
-            initTimeRangeIterator(groupByTimeParameter, ascending, true),
+            Collections.singletonList(aggregator),
+            initTimeRangeIterator(groupByTimeParameter, true, true),
             groupByTimeParameter,
             DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
-    seriesAggregationScanOperator.initQueryDataSource(
-        new QueryDataSource(seqResources, unSeqResources));
+    try {
+      List<TsBlock> result = new ArrayList<>();
+      
fragmentInstanceContext.setSourcePaths(Collections.singletonList(alignedPath));
+      seriesAggregationScanOperator.initQueryDataSource(
+          fragmentInstanceContext.getSharedQueryDataSource());
+
+      while (seriesAggregationScanOperator.hasNext()) {
+        result.add(seriesAggregationScanOperator.next());
+      }
+
+      return result;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      fragmentInstanceContext.releaseResource();
+    }
   }
 
   @Override

Reply via email to