This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch benchants_branch
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/benchants_branch by this push:
     new 05ad8c1ad16 Add effective query interface
05ad8c1ad16 is described below

commit 05ad8c1ad164ad88ad878a944534e4a00d7f7991
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Jun 7 10:31:51 2023 +0800

    Add effective query interface
---
 server/pom.xml                                     |  2 +-
 .../AbstractSeriesAggregationScanOperator.java     |  1 -
 .../UpdateCacheRestorableTsFileIOWriter.java       |  4 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 79 +++++++++++++++++++
 .../iotdb/tsfile/read/filter/TimeFilter.java       | 89 ++++++++++++++++++++++
 .../tsfile/read/filter/factory/FilterFactory.java  |  4 +
 .../read/filter/factory/FilterSerializeId.java     |  3 +-
 7 files changed, 178 insertions(+), 4 deletions(-)

diff --git a/server/pom.xml b/server/pom.xml
index e01e6625b5f..6dc53c1d5fa 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -487,7 +487,7 @@
                                 
<buildArg>-J-Djdk.nio.maxCachedBufferSize=536870912</buildArg>
                                 <!--                                
<buildArg>&#45;&#45;initialize-at-run-time=io.micronaut.configuration.metrics,io.micrometer,io.netty</buildArg>-->
                                 <!-- For Quick Build (22.1+) -->
-<!--                                <buildArg>-Ob</buildArg>-->
+                                <!--                                
<buildArg>-Ob</buildArg>-->
                             </buildArgs>
                             
<mainClass>org.apache.iotdb.db.service.DataNode</mainClass>
                             <!-- Start: Workaround for 22.2: Disable the 
default Java Module Path using USE_NATIVE_IMAGE_JAVA_PLATFORM_MODULE_SYSTEM -->
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 819625c53ee..0e342d38321 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -65,7 +65,6 @@ public abstract class AbstractSeriesAggregationScanOperator 
extends AbstractData
   protected boolean finished = false;
 
   private final long cachedRawDataSize;
-  private final long maxReturnSize;
 
   protected AbstractSeriesAggregationScanOperator(
       PlanNodeId sourceId,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/reader/UpdateCacheRestorableTsFileIOWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/query/reader/UpdateCacheRestorableTsFileIOWriter.java
index cefbb412fa9..09c92c2e5e7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/reader/UpdateCacheRestorableTsFileIOWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/reader/UpdateCacheRestorableTsFileIOWriter.java
@@ -48,7 +48,9 @@ public class UpdateCacheRestorableTsFileIOWriter extends 
RestorableTsFileIOWrite
 
   @Override
   protected void updateCache(String device, TimeseriesMetadata 
timeseriesMetadata) {
-    TIME_SERIES_METADATA_CACHE.updateCache(file.getPath(), device, 
timeseriesMetadata);
+    if (device.startsWith("root.cpu")) {
+      TIME_SERIES_METADATA_CACHE.updateCache(file.getPath(), device, 
timeseriesMetadata);
+    }
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 28bec0979bd..bccd8f24a14 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -18,11 +18,13 @@
  */
 package org.apache.iotdb.db.service.thrift.impl;
 
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -32,8 +34,18 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.metadata.template.TemplateQueryType;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import 
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregationScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
@@ -43,8 +55,11 @@ import 
org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.parser.ASTVisitor;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
@@ -121,10 +136,15 @@ import 
org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
 import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import io.airlift.units.Duration;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -133,14 +153,18 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
 import static 
org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
@@ -157,6 +181,8 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
   private static final boolean enableAuditLog = config.isEnableAuditLog();
 
+  public static Duration DEFAULT_TIME_SLICE = new Duration(60_000, 
TimeUnit.MILLISECONDS);
+
   private final IPartitionFetcher partitionFetcher;
 
   private final ISchemaFetcher schemaFetcher;
@@ -555,6 +581,59 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     }
   }
 
+  // 1. 限流()
+  // 2. call executeGroupByQueryInternal 拿到List<TsBlock>
+  // 3. 组装结果集返回给客户端,模仿QueryDataSetUtils.convertTsBlockByFetchSize
+  private List<TsBlock> executeGroupByQueryInternal(
+      String device,
+      String measurement,
+      TSDataType dataType,
+      long startTime,
+      long endTme,
+      long interval,
+      TAggregationType aggregationType,
+      List<DataRegion> dataRegionList)
+      throws IllegalPathException {
+    IMeasurementSchema measurementSchema = new MeasurementSchema(measurement, 
dataType);
+    AlignedPath alignedPath =
+        new AlignedPath(
+            device,
+            Collections.singletonList(measurement),
+            Collections.singletonList(measurementSchema));
+
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+    PlanNodeId planNodeId = new PlanNodeId("1");
+    driverContext.addOperatorContext(1, planNodeId, 
SeriesScanOperator.class.getSimpleName());
+    driverContext
+        .getOperatorContexts()
+        .forEach(operatorContext -> 
operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE));
+
+    SeriesScanOptions.Builder scanOptionsBuilder = new 
SeriesScanOptions.Builder();
+    scanOptionsBuilder.withAllSensors(Collections.singleton(measurement));
+    scanOptionsBuilder.withGlobalTimeFilter(new 
TimeFilter.TimeGtEqAndLt(startTime, endTme));
+
+    AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
+        new AlignedSeriesAggregationScanOperator(
+            planNodeId,
+            alignedPath,
+            Ordering.ASC,
+            scanOptionsBuilder.build(),
+            driverContext.getOperatorContexts().get(0),
+            aggregators,
+            initTimeRangeIterator(groupByTimeParameter, ascending, true),
+            groupByTimeParameter,
+            DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+    seriesAggregationScanOperator.initQueryDataSource(
+        new QueryDataSource(seqResources, unSeqResources));
+  }
+
   @Override
   public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq 
req) {
     return executeStatementV2(req);
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
index 0cbd94dc0a6..cda1762e917 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
@@ -18,8 +18,10 @@
  */
 package org.apache.iotdb.tsfile.read.filter;
 
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterSerializeId;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
 import org.apache.iotdb.tsfile.read.filter.operator.Between;
 import org.apache.iotdb.tsfile.read.filter.operator.Eq;
@@ -29,7 +31,11 @@ import org.apache.iotdb.tsfile.read.filter.operator.In;
 import org.apache.iotdb.tsfile.read.filter.operator.Lt;
 import org.apache.iotdb.tsfile.read.filter.operator.LtEq;
 import org.apache.iotdb.tsfile.read.filter.operator.NotEq;
+import org.apache.iotdb.tsfile.read.filter.operator.OrFilter;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -217,6 +223,89 @@ public class TimeFilter {
     }
   }
 
+  public static class TimeGtEqAndLt implements Filter {
+
+    private long startTime;
+
+    private long endTime;
+
+    public TimeGtEqAndLt() {}
+
+    public TimeGtEqAndLt(long startTime, long endTime) {
+      this.startTime = startTime;
+      this.endTime = endTime;
+    }
+
+    @Override
+    public boolean satisfy(Statistics statistics) {
+      return !(statistics.getEndTime() < startTime || 
statistics.getStartTime() >= endTime);
+    }
+
+    @Override
+    public boolean allSatisfy(Statistics statistics) {
+      return startTime <= statistics.getStartTime() && statistics.getEndTime() 
< endTime;
+    }
+
+    @Override
+    public boolean satisfy(long time, Object value) {
+      return startTime <= time && time < endTime;
+    }
+
+    @Override
+    public boolean satisfyStartEndTime(long startTime, long endTime) {
+      return !(startTime < this.startTime || endTime >= this.endTime);
+    }
+
+    @Override
+    public boolean containStartEndTime(long startTime, long endTime) {
+      return this.startTime <= startTime && endTime < this.endTime;
+    }
+
+    @Override
+    public Filter copy() {
+      return new TimeGtEqAndLt(startTime, endTime);
+    }
+
+    @Override
+    public String toString() {
+      return "TimeGtEqAndLt{" + "startTime=" + startTime + ", endTime=" + 
endTime + '}';
+    }
+
+    @Override
+    public void serialize(DataOutputStream outputStream) {
+      try {
+        outputStream.write(getSerializeId().ordinal());
+        outputStream.writeLong(startTime);
+        outputStream.writeLong(endTime);
+      } catch (IOException ignored) {
+        // ignored
+      }
+    }
+
+    @Override
+    public void deserialize(ByteBuffer buffer) {
+      startTime = buffer.getLong();
+      endTime = buffer.getLong();
+    }
+
+    @Override
+    public FilterSerializeId getSerializeId() {
+      return FilterSerializeId.TIME_GTEQ_AND_LT;
+    }
+
+    @Override
+    public List<TimeRange> getTimeRanges() {
+      return startTime >= endTime
+          ? Collections.emptyList()
+          : Collections.singletonList(new TimeRange(startTime, endTime - 1));
+    }
+
+    @Override
+    public Filter reverse() {
+      return new OrFilter(new TimeLt(startTime), new TimeGtEq(endTime));
+    }
+  }
+
   /**
    * returns a default time filter by whether it's an ascending query.
    *
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterFactory.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterFactory.java
index 6d647b96c55..c3e1e9b3b82 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterFactory.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterFactory.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.read.filter.factory;
 
 import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
 import org.apache.iotdb.tsfile.read.filter.GroupByMonthFilter;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.filter.operator.Between;
@@ -101,6 +102,9 @@ public class FilterFactory {
       case BETWEEN:
         filter = new Between<>();
         break;
+      case TIME_GTEQ_AND_LT:
+        filter = new TimeFilter.TimeGtEqAndLt();
+        break;
       default:
         throw new UnsupportedOperationException("Unknown filter type " + id);
     }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java
index bb3c7e10608..912bc9f3360 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java
@@ -34,5 +34,6 @@ public enum FilterSerializeId {
   IN,
   REGEXP,
   LIKE,
-  BETWEEN
+  BETWEEN,
+  TIME_GTEQ_AND_LT
 }

Reply via email to