This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch benchants_branch
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/benchants_branch by this push:
new a74f9cea9cd Add aligned paramter
a74f9cea9cd is described below
commit a74f9cea9cdfd6f0daa2261e6d8f9b5db119bcb0
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Jun 7 13:58:24 2023 +0800
Add aligned paramter
---
.../service/thrift/impl/ClientRPCServiceImpl.java | 67 ++++++++++++++--------
1 file changed, 44 insertions(+), 23 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 67db441a311..77836500d90 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -47,7 +49,9 @@ import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import
org.apache.iotdb.db.mpp.execution.operator.source.AbstractSeriesAggregationScanOperator;
import
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregationScanOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
@@ -596,6 +600,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
String device,
String measurement,
TSDataType dataType,
+ boolean isAligned,
long startTime,
long endTme,
long interval,
@@ -609,13 +614,6 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
"dataRegionList.size() should only be 1 now, current size is " +
dataRegionSize);
}
- IMeasurementSchema measurementSchema = new MeasurementSchema(measurement,
dataType);
- AlignedPath alignedPath =
- new AlignedPath(
- device,
- Collections.singletonList(measurement),
- Collections.singletonList(measurementSchema));
-
Filter timeFilter = new TimeFilter.TimeGtEqAndLt(startTime, endTme);
QueryId queryId = new QueryId("stub_query");
@@ -647,25 +645,48 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
GroupByTimeParameter groupByTimeParameter =
new GroupByTimeParameter(startTime, endTme, interval, interval, true);
- AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
- new AlignedSeriesAggregationScanOperator(
- planNodeId,
- alignedPath,
- Ordering.ASC,
- scanOptionsBuilder.build(),
- driverContext.getOperatorContexts().get(0),
- Collections.singletonList(aggregator),
- initTimeRangeIterator(groupByTimeParameter, true, true),
- groupByTimeParameter,
- DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+ IMeasurementSchema measurementSchema = new MeasurementSchema(measurement,
dataType);
+ AbstractSeriesAggregationScanOperator operator;
+ PartialPath path;
+ if (isAligned) {
+ path =
+ new AlignedPath(
+ device,
+ Collections.singletonList(measurement),
+ Collections.singletonList(measurementSchema));
+ operator =
+ new AlignedSeriesAggregationScanOperator(
+ planNodeId,
+ (AlignedPath) path,
+ Ordering.ASC,
+ scanOptionsBuilder.build(),
+ driverContext.getOperatorContexts().get(0),
+ Collections.singletonList(aggregator),
+ initTimeRangeIterator(groupByTimeParameter, true, true),
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+ } else {
+ path = new MeasurementPath(device, measurement, measurementSchema);
+ operator =
+ new SeriesAggregationScanOperator(
+ planNodeId,
+ path,
+ Ordering.ASC,
+ scanOptionsBuilder.build(),
+ driverContext.getOperatorContexts().get(0),
+ Collections.singletonList(aggregator),
+ initTimeRangeIterator(groupByTimeParameter, true, true),
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+ }
+
try {
List<TsBlock> result = new ArrayList<>();
-
fragmentInstanceContext.setSourcePaths(Collections.singletonList(alignedPath));
- seriesAggregationScanOperator.initQueryDataSource(
- fragmentInstanceContext.getSharedQueryDataSource());
+ fragmentInstanceContext.setSourcePaths(Collections.singletonList(path));
+
operator.initQueryDataSource(fragmentInstanceContext.getSharedQueryDataSource());
- while (seriesAggregationScanOperator.hasNext()) {
- result.add(seriesAggregationScanOperator.next());
+ while (operator.hasNext()) {
+ result.add(operator.next());
}
return result;