This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch benchants_branch
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/benchants_branch by this push:
     new a74f9cea9cd Add aligned paramter
a74f9cea9cd is described below

commit a74f9cea9cdfd6f0daa2261e6d8f9b5db119bcb0
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Jun 7 13:58:24 2023 +0800

    Add aligned paramter
---
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 67 ++++++++++++++--------
 1 file changed, 44 insertions(+), 23 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 67db441a311..77836500d90 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -47,7 +49,9 @@ import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import 
org.apache.iotdb.db.mpp.execution.operator.source.AbstractSeriesAggregationScanOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregationScanOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
@@ -596,6 +600,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       String device,
       String measurement,
       TSDataType dataType,
+      boolean isAligned,
       long startTime,
       long endTme,
       long interval,
@@ -609,13 +614,6 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
           "dataRegionList.size() should only be 1 now,  current size is " + 
dataRegionSize);
     }
 
-    IMeasurementSchema measurementSchema = new MeasurementSchema(measurement, 
dataType);
-    AlignedPath alignedPath =
-        new AlignedPath(
-            device,
-            Collections.singletonList(measurement),
-            Collections.singletonList(measurementSchema));
-
     Filter timeFilter = new TimeFilter.TimeGtEqAndLt(startTime, endTme);
 
     QueryId queryId = new QueryId("stub_query");
@@ -647,25 +645,48 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     GroupByTimeParameter groupByTimeParameter =
         new GroupByTimeParameter(startTime, endTme, interval, interval, true);
 
-    AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
-        new AlignedSeriesAggregationScanOperator(
-            planNodeId,
-            alignedPath,
-            Ordering.ASC,
-            scanOptionsBuilder.build(),
-            driverContext.getOperatorContexts().get(0),
-            Collections.singletonList(aggregator),
-            initTimeRangeIterator(groupByTimeParameter, true, true),
-            groupByTimeParameter,
-            DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+    IMeasurementSchema measurementSchema = new MeasurementSchema(measurement, 
dataType);
+    AbstractSeriesAggregationScanOperator operator;
+    PartialPath path;
+    if (isAligned) {
+      path =
+          new AlignedPath(
+              device,
+              Collections.singletonList(measurement),
+              Collections.singletonList(measurementSchema));
+      operator =
+          new AlignedSeriesAggregationScanOperator(
+              planNodeId,
+              (AlignedPath) path,
+              Ordering.ASC,
+              scanOptionsBuilder.build(),
+              driverContext.getOperatorContexts().get(0),
+              Collections.singletonList(aggregator),
+              initTimeRangeIterator(groupByTimeParameter, true, true),
+              groupByTimeParameter,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+    } else {
+      path = new MeasurementPath(device, measurement, measurementSchema);
+      operator =
+          new SeriesAggregationScanOperator(
+              planNodeId,
+              path,
+              Ordering.ASC,
+              scanOptionsBuilder.build(),
+              driverContext.getOperatorContexts().get(0),
+              Collections.singletonList(aggregator),
+              initTimeRangeIterator(groupByTimeParameter, true, true),
+              groupByTimeParameter,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+    }
+
     try {
       List<TsBlock> result = new ArrayList<>();
-      
fragmentInstanceContext.setSourcePaths(Collections.singletonList(alignedPath));
-      seriesAggregationScanOperator.initQueryDataSource(
-          fragmentInstanceContext.getSharedQueryDataSource());
+      fragmentInstanceContext.setSourcePaths(Collections.singletonList(path));
+      
operator.initQueryDataSource(fragmentInstanceContext.getSharedQueryDataSource());
 
-      while (seriesAggregationScanOperator.hasNext()) {
-        result.add(seriesAggregationScanOperator.next());
+      while (operator.hasNext()) {
+        result.add(operator.next());
       }
 
       return result;

Reply via email to