This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch benchants_branch
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/benchants_branch by this push:
new 05ad8c1ad16 Add effective query interface
05ad8c1ad16 is described below
commit 05ad8c1ad164ad88ad878a944534e4a00d7f7991
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Jun 7 10:31:51 2023 +0800
Add effective query interface
---
server/pom.xml | 2 +-
.../AbstractSeriesAggregationScanOperator.java | 1 -
.../UpdateCacheRestorableTsFileIOWriter.java | 4 +-
.../service/thrift/impl/ClientRPCServiceImpl.java | 79 +++++++++++++++++++
.../iotdb/tsfile/read/filter/TimeFilter.java | 89 ++++++++++++++++++++++
.../tsfile/read/filter/factory/FilterFactory.java | 4 +
.../read/filter/factory/FilterSerializeId.java | 3 +-
7 files changed, 178 insertions(+), 4 deletions(-)
diff --git a/server/pom.xml b/server/pom.xml
index e01e6625b5f..6dc53c1d5fa 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -487,7 +487,7 @@
<buildArg>-J-Djdk.nio.maxCachedBufferSize=536870912</buildArg>
<!--
<buildArg>--initialize-at-run-time=io.micronaut.configuration.metrics,io.micrometer,io.netty</buildArg>-->
<!-- For Quick Build (22.1+) -->
-<!-- <buildArg>-Ob</buildArg>-->
+ <!--
<buildArg>-Ob</buildArg>-->
</buildArgs>
<mainClass>org.apache.iotdb.db.service.DataNode</mainClass>
<!-- Start: Workaround for 22.2: Disable the
default Java Module Path using USE_NATIVE_IMAGE_JAVA_PLATFORM_MODULE_SYSTEM -->
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 819625c53ee..0e342d38321 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -65,7 +65,6 @@ public abstract class AbstractSeriesAggregationScanOperator
extends AbstractData
protected boolean finished = false;
private final long cachedRawDataSize;
- private final long maxReturnSize;
protected AbstractSeriesAggregationScanOperator(
PlanNodeId sourceId,
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/UpdateCacheRestorableTsFileIOWriter.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/UpdateCacheRestorableTsFileIOWriter.java
index cefbb412fa9..09c92c2e5e7 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/UpdateCacheRestorableTsFileIOWriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/UpdateCacheRestorableTsFileIOWriter.java
@@ -48,7 +48,9 @@ public class UpdateCacheRestorableTsFileIOWriter extends
RestorableTsFileIOWrite
@Override
protected void updateCache(String device, TimeseriesMetadata
timeseriesMetadata) {
- TIME_SERIES_METADATA_CACHE.updateCache(file.getPath(), device,
timeseriesMetadata);
+ if (device.startsWith("root.cpu")) {
+ TIME_SERIES_METADATA_CACHE.updateCache(file.getPath(), device,
timeseriesMetadata);
+ }
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 28bec0979bd..bccd8f24a14 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -18,11 +18,13 @@
*/
package org.apache.iotdb.db.service.thrift.impl;
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -32,8 +34,18 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.metadata.template.TemplateQueryType;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregationScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
@@ -43,8 +55,11 @@ import
org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.parser.ASTVisitor;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
@@ -121,10 +136,15 @@ import
org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import io.airlift.units.Duration;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,14 +153,18 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static
org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
+import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@@ -157,6 +181,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
private static final boolean enableAuditLog = config.isEnableAuditLog();
+ public static Duration DEFAULT_TIME_SLICE = new Duration(60_000,
TimeUnit.MILLISECONDS);
+
private final IPartitionFetcher partitionFetcher;
private final ISchemaFetcher schemaFetcher;
@@ -555,6 +581,59 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
}
}
+ // 1. 限流()
+ // 2. call executeGroupByQueryInternal 拿到List<TsBlock>
+ // 3. 组装结果集返回给客户端,模仿QueryDataSetUtils.convertTsBlockByFetchSize
+ private List<TsBlock> executeGroupByQueryInternal(
+ String device,
+ String measurement,
+ TSDataType dataType,
+ long startTime,
+ long endTme,
+ long interval,
+ TAggregationType aggregationType,
+ List<DataRegion> dataRegionList)
+ throws IllegalPathException {
+ IMeasurementSchema measurementSchema = new MeasurementSchema(measurement,
dataType);
+ AlignedPath alignedPath =
+ new AlignedPath(
+ device,
+ Collections.singletonList(measurement),
+ Collections.singletonList(measurementSchema));
+
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ driverContext.addOperatorContext(1, planNodeId,
SeriesScanOperator.class.getSimpleName());
+ driverContext
+ .getOperatorContexts()
+ .forEach(operatorContext ->
operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE));
+
+ SeriesScanOptions.Builder scanOptionsBuilder = new
SeriesScanOptions.Builder();
+ scanOptionsBuilder.withAllSensors(Collections.singleton(measurement));
+ scanOptionsBuilder.withGlobalTimeFilter(new
TimeFilter.TimeGtEqAndLt(startTime, endTme));
+
+ AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
+ new AlignedSeriesAggregationScanOperator(
+ planNodeId,
+ alignedPath,
+ Ordering.ASC,
+ scanOptionsBuilder.build(),
+ driverContext.getOperatorContexts().get(0),
+ aggregators,
+ initTimeRangeIterator(groupByTimeParameter, ascending, true),
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+ seriesAggregationScanOperator.initQueryDataSource(
+ new QueryDataSource(seqResources, unSeqResources));
+ }
+
@Override
public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq
req) {
return executeStatementV2(req);
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
index 0cbd94dc0a6..cda1762e917 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
@@ -18,8 +18,10 @@
*/
package org.apache.iotdb.tsfile.read.filter;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterSerializeId;
import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
import org.apache.iotdb.tsfile.read.filter.operator.Between;
import org.apache.iotdb.tsfile.read.filter.operator.Eq;
@@ -29,7 +31,11 @@ import org.apache.iotdb.tsfile.read.filter.operator.In;
import org.apache.iotdb.tsfile.read.filter.operator.Lt;
import org.apache.iotdb.tsfile.read.filter.operator.LtEq;
import org.apache.iotdb.tsfile.read.filter.operator.NotEq;
+import org.apache.iotdb.tsfile.read.filter.operator.OrFilter;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -217,6 +223,89 @@ public class TimeFilter {
}
}
+ public static class TimeGtEqAndLt implements Filter {
+
+ private long startTime;
+
+ private long endTime;
+
+ public TimeGtEqAndLt() {}
+
+ public TimeGtEqAndLt(long startTime, long endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ }
+
+ @Override
+ public boolean satisfy(Statistics statistics) {
+ return !(statistics.getEndTime() < startTime ||
statistics.getStartTime() >= endTime);
+ }
+
+ @Override
+ public boolean allSatisfy(Statistics statistics) {
+ return startTime <= statistics.getStartTime() && statistics.getEndTime()
< endTime;
+ }
+
+ @Override
+ public boolean satisfy(long time, Object value) {
+ return startTime <= time && time < endTime;
+ }
+
+ @Override
+ public boolean satisfyStartEndTime(long startTime, long endTime) {
+ return !(startTime < this.startTime || endTime >= this.endTime);
+ }
+
+ @Override
+ public boolean containStartEndTime(long startTime, long endTime) {
+ return this.startTime <= startTime && endTime < this.endTime;
+ }
+
+ @Override
+ public Filter copy() {
+ return new TimeGtEqAndLt(startTime, endTime);
+ }
+
+ @Override
+ public String toString() {
+ return "TimeGtEqAndLt{" + "startTime=" + startTime + ", endTime=" +
endTime + '}';
+ }
+
+ @Override
+ public void serialize(DataOutputStream outputStream) {
+ try {
+ outputStream.write(getSerializeId().ordinal());
+ outputStream.writeLong(startTime);
+ outputStream.writeLong(endTime);
+ } catch (IOException ignored) {
+ // ignored
+ }
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) {
+ startTime = buffer.getLong();
+ endTime = buffer.getLong();
+ }
+
+ @Override
+ public FilterSerializeId getSerializeId() {
+ return FilterSerializeId.TIME_GTEQ_AND_LT;
+ }
+
+ @Override
+ public List<TimeRange> getTimeRanges() {
+ return startTime >= endTime
+ ? Collections.emptyList()
+ : Collections.singletonList(new TimeRange(startTime, endTime - 1));
+ }
+
+ @Override
+ public Filter reverse() {
+ return new OrFilter(new TimeLt(startTime), new TimeGtEq(endTime));
+ }
+ }
+
/**
* returns a default time filter by whether it's an ascending query.
*
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterFactory.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterFactory.java
index 6d647b96c55..c3e1e9b3b82 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterFactory.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterFactory.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.read.filter.factory;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import org.apache.iotdb.tsfile.read.filter.GroupByMonthFilter;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.filter.operator.Between;
@@ -101,6 +102,9 @@ public class FilterFactory {
case BETWEEN:
filter = new Between<>();
break;
+ case TIME_GTEQ_AND_LT:
+ filter = new TimeFilter.TimeGtEqAndLt();
+ break;
default:
throw new UnsupportedOperationException("Unknown filter type " + id);
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java
index bb3c7e10608..912bc9f3360 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java
@@ -34,5 +34,6 @@ public enum FilterSerializeId {
IN,
REGEXP,
LIKE,
- BETWEEN
+ BETWEEN,
+ TIME_GTEQ_AND_LT
}