This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch benchants_branch
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/benchants_branch by this push:
new fa90736430d Add SessionInfo as parameter
fa90736430d is described below
commit fa90736430d078dc13eb65f0c0e8b4b6b9ac78d6
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Jun 7 11:17:15 2023 +0800
Add SessionInfo as parameter
---
.../fragment/FragmentInstanceContext.java | 2 +-
.../fragment/FragmentInstanceManager.java | 2 +-
.../service/thrift/impl/ClientRPCServiceImpl.java | 58 +++++++++++++++++++---
3 files changed, 52 insertions(+), 10 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 6434e8faa06..37120a4c102 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -381,7 +381,7 @@ public class FragmentInstanceContext extends QueryContext {
* All file paths used by this fragment instance must be cleared and thus
the usage reference must
* be decreased.
*/
- protected synchronized void releaseResource() {
+ public synchronized void releaseResource() {
// For schema related query FI, closedFilePaths and unClosedFilePaths will
be null
if (closedFilePaths != null) {
for (TsFileResource tsFile : closedFilePaths) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 74e2985207d..c5ccd63f2d1 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -64,7 +64,7 @@ public class FragmentInstanceManager {
private final IDriverScheduler scheduler = DriverScheduler.getInstance();
private final ScheduledExecutorService instanceManagementExecutor;
- private final ExecutorService instanceNotificationExecutor;
+ public final ExecutorService instanceNotificationExecutor;
private final Duration infoCacheTime;
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index bccd8f24a14..67db441a311 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -34,15 +34,18 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.OperationType;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.metadata.template.TemplateQueryType;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
@@ -56,6 +59,9 @@ import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.parser.ASTVisitor;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementType;
@@ -140,6 +146,7 @@ import
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -585,6 +592,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// 2. call executeGroupByQueryInternal 拿到List<TsBlock>
// 3. 组装结果集返回给客户端,模仿QueryDataSetUtils.convertTsBlockByFetchSize
private List<TsBlock> executeGroupByQueryInternal(
+ SessionInfo sessionInfo,
String device,
String measurement,
TSDataType dataType,
@@ -594,6 +602,13 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
TAggregationType aggregationType,
List<DataRegion> dataRegionList)
throws IllegalPathException {
+
+ int dataRegionSize = dataRegionList.size();
+ if (dataRegionSize != 1) {
+ throw new IllegalArgumentException(
+ "dataRegionList.size() should only be 1 now, current size is " +
dataRegionSize);
+ }
+
IMeasurementSchema measurementSchema = new MeasurementSchema(measurement,
dataType);
AlignedPath alignedPath =
new AlignedPath(
@@ -601,13 +616,17 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
Collections.singletonList(measurement),
Collections.singletonList(measurementSchema));
+ Filter timeFilter = new TimeFilter.TimeGtEqAndLt(startTime, endTme);
+
QueryId queryId = new QueryId("stub_query");
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
FragmentInstanceStateMachine stateMachine =
- new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ new FragmentInstanceStateMachine(
+ instanceId,
FragmentInstanceManager.getInstance().instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
- createFragmentInstanceContext(instanceId, stateMachine);
+ createFragmentInstanceContext(
+ instanceId, stateMachine, sessionInfo, dataRegionList.get(0),
timeFilter);
DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
PlanNodeId planNodeId = new PlanNodeId("1");
driverContext.addOperatorContext(1, planNodeId,
SeriesScanOperator.class.getSimpleName());
@@ -617,7 +636,16 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
SeriesScanOptions.Builder scanOptionsBuilder = new
SeriesScanOptions.Builder();
scanOptionsBuilder.withAllSensors(Collections.singleton(measurement));
- scanOptionsBuilder.withGlobalTimeFilter(new
TimeFilter.TimeGtEqAndLt(startTime, endTme));
+ scanOptionsBuilder.withGlobalTimeFilter(timeFilter);
+
+ Aggregator aggregator =
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(aggregationType, dataType,
null, null, true),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new InputLocation[] {new
InputLocation(0, 0)}));
+
+ GroupByTimeParameter groupByTimeParameter =
+ new GroupByTimeParameter(startTime, endTme, interval, interval, true);
AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
new AlignedSeriesAggregationScanOperator(
@@ -626,12 +654,26 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
Ordering.ASC,
scanOptionsBuilder.build(),
driverContext.getOperatorContexts().get(0),
- aggregators,
- initTimeRangeIterator(groupByTimeParameter, ascending, true),
+ Collections.singletonList(aggregator),
+ initTimeRangeIterator(groupByTimeParameter, true, true),
groupByTimeParameter,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
- seriesAggregationScanOperator.initQueryDataSource(
- new QueryDataSource(seqResources, unSeqResources));
+ try {
+ List<TsBlock> result = new ArrayList<>();
+
fragmentInstanceContext.setSourcePaths(Collections.singletonList(alignedPath));
+ seriesAggregationScanOperator.initQueryDataSource(
+ fragmentInstanceContext.getSharedQueryDataSource());
+
+ while (seriesAggregationScanOperator.hasNext()) {
+ result.add(seriesAggregationScanOperator.next());
+ }
+
+ return result;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ fragmentInstanceContext.releaseResource();
+ }
}
@Override