This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch benchants_branch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9cced1296b03dcd76d4bd4a43a181869d18699fd
Author: Beyyes <[email protected]>
AuthorDate: Wed Jun 7 18:17:01 2023 +0800

    add executeGroupByQueryIntervalQuery rpc interface
---
 .../thrift/src/main/thrift/client.thrift           |  11 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  |  82 +++++++--
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   | 191 +++++++++++++++++++++
 .../iotdb/tsfile/read/filter/TimeFilter.java       |   2 +-
 4 files changed, 267 insertions(+), 19 deletions(-)

diff --git a/iotdb-protocol/thrift/src/main/thrift/client.thrift 
b/iotdb-protocol/thrift/src/main/thrift/client.thrift
index ec35e8f5020..f5837ec84d3 100644
--- a/iotdb-protocol/thrift/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift/src/main/thrift/client.thrift
@@ -359,11 +359,12 @@ struct TSGroupByQueryIntervalReq {
   4: required string measurement
   5: required i32 dataType
   6: required common.TAggregationType aggregationType
-  7: optional i64 startTime
-  8: optional i64 endTime
-  9: optional i64 interval
-  10: optional i32 fetchSize
-  11: optional i64 timeout
+  7: optional string database
+  8: optional i64 startTime
+  9: optional i64 endTime
+  10: optional i64 interval
+  11: optional i32 fetchSize
+  12: optional i64 timeout
 }
 
 struct TSCreateMultiTimeseriesReq {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 5c3cba12996..22648b62d4d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -18,13 +18,19 @@
  */
 package org.apache.iotdb.db.service.thrift.impl;
 
+import io.jsonwebtoken.lang.Strings;
+import org.apache.commons.lang.StringUtils;
 import org.apache.iotdb.common.rpc.thrift.TAggregationType;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
@@ -44,6 +50,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
@@ -86,6 +93,7 @@ import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.DropSchemaTempla
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.SetSchemaTemplateStatement;
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.protocol.rest.StringUtil;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.quotas.DataNodeThrottleQuotaManager;
@@ -94,6 +102,7 @@ import 
org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
 import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.db.utils.TimePartitionUtils;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -164,9 +173,11 @@ import java.nio.ByteBuffer;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
@@ -174,6 +185,7 @@ import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTim
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
 import static 
org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
+import static 
org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize;
 import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@@ -193,6 +205,9 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
   public static Duration DEFAULT_TIME_SLICE = new Duration(60_000, 
TimeUnit.MILLISECONDS);
 
+  private static final Semaphore querySemaphore =
+      new Semaphore(Runtime.getRuntime().availableProcessors() * 2);
+
   private final IPartitionFetcher partitionFetcher;
 
   private final ISchemaFetcher schemaFetcher;
@@ -213,8 +228,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
   private static final SelectResult OLD_SELECT_RESULT =
       (resp, queryExecution, fetchSize) -> {
-        Pair<TSQueryDataSet, Boolean> pair =
-            QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, 
fetchSize);
+        Pair<TSQueryDataSet, Boolean> pair = 
convertTsBlockByFetchSize(queryExecution, fetchSize);
         resp.setQueryDataSet(pair.left);
         return pair.right;
       };
@@ -1262,7 +1276,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
       try (SetThreadName queryName = new 
SetThreadName(queryExecution.getQueryId())) {
         Pair<TSQueryDataSet, Boolean> pair =
-            QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, 
req.fetchSize);
+            convertTsBlockByFetchSize(queryExecution, req.fetchSize);
         TSQueryDataSet result = pair.left;
         finished = pair.right;
         boolean hasResultSet = result.bufferForTime().limit() != 0;
@@ -1848,28 +1862,70 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
   @Override
   public TSExecuteStatementResp 
executeGroupByQueryIntervalQuery(TSGroupByQueryIntervalReq req)
       throws TException {
+
     try {
-      IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+      querySemaphore.acquire();
 
-      DataRegionId dataRegionId = new DataRegionId(5);
-      List<DataRegion> dataRegionList = null;
-      StorageEngine.getInstance().getDataRegion(dataRegionId);
+      IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
 
-      List<TsBlock> ret =
+      String database = req.getDatabase();
+      if (StringUtils.isEmpty(database)) {
+        String[] splits = Strings.split(req.getDevice(), "\\.");
+        database = String.format("%s.%s", splits[0], splits[1]);
+      }
+      String deviceId = req.getDevice();
+      String measurementId = req.getMeasurement();
+      TSDataType dataType = TSDataType.getTsDataType((byte) req.getDataType());
+
+      // only one database, one device, one time interval
+      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new 
HashMap<>();
+      TTimePartitionSlot timePartitionSlot =
+          TimePartitionUtils.getTimePartition(req.getStartTime());
+      DataPartitionQueryParam queryParam =
+          new DataPartitionQueryParam(
+              deviceId, Collections.singletonList(timePartitionSlot), false, 
false);
+      sgNameToQueryParamsMap.put(database, 
Collections.singletonList(queryParam));
+      DataPartition dataPartition = 
partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+      List<DataRegion> dataRegionList = new ArrayList<>();
+      List<TRegionReplicaSet> replicaSets =
+          dataPartition.getDataRegionReplicaSet(
+              deviceId, Collections.singletonList(timePartitionSlot));
+      for (TRegionReplicaSet region : replicaSets) {
+        dataRegionList.add(
+            StorageEngine.getInstance()
+                .getDataRegion(new 
DataRegionId(region.getRegionId().getId())));
+      }
+
+      List<TsBlock> blockResult =
           executeGroupByQueryInternal(
               SESSION_MANAGER.getSessionInfo(clientSession),
-              req.getDevice(),
-              req.getMeasurement(),
-              TSDataType.getTsDataType((byte) 2),
+              deviceId,
+              measurementId,
+              dataType,
               req.getStartTime(),
               req.getEndTime(),
               req.getInterval(),
               req.getAggregationType(),
               dataRegionList);
-    } catch (Exception e) {
 
+      String outputColumnName = req.getAggregationType().name();
+      List<ColumnHeader> columnHeaders =
+          Collections.singletonList(new ColumnHeader(outputColumnName, 
dataType));
+      DatasetHeader header = new DatasetHeader(columnHeaders, false);
+      
header.setColumnToTsBlockIndexMap(Collections.singletonList(outputColumnName));
+
+      TSExecuteStatementResp resp = createResponse(header, 1);
+      TSQueryDataSet queryDataSet = convertTsBlockByFetchSize(blockResult);
+      resp.setQueryDataSet(queryDataSet);
+
+      return resp;
+    } catch (Exception e) {
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + req + "\". " + 
OperationType.EXECUTE_AGG_QUERY));
+    } finally {
+      querySemaphore.release();
+      SESSION_MANAGER.updateIdleTime();
     }
-    return null;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 6baee727383..9aa5c0b8481 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -244,6 +244,197 @@ public class QueryDataSetUtils {
     return new Pair<>(tsQueryDataSet, finished);
   }
 
+  public static TSQueryDataSet convertTsBlockByFetchSize(List<TsBlock> 
tsBlocks)
+      throws IOException {
+    TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
+
+    // one time column and each value column has an actual value buffer and a 
bitmap value to
+    // indicate whether it is a null
+    int columnNum = 1;
+    int columnNumWithTime = columnNum * 2 + 1;
+    DataOutputStream[] dataOutputStreams = new 
DataOutputStream[columnNumWithTime];
+    ByteArrayOutputStream[] byteArrayOutputStreams = new 
ByteArrayOutputStream[columnNumWithTime];
+    for (int i = 0; i < columnNumWithTime; i++) {
+      byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+      dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+    }
+
+    int rowCount = 0;
+    int[] valueOccupation = new int[columnNum];
+
+    // used to record a bitmap for every 8 points
+    int[] bitmaps = new int[columnNum];
+    for (TsBlock tsBlock : tsBlocks) {
+      if (tsBlock.isEmpty()) {
+        continue;
+      }
+
+      int currentCount = tsBlock.getPositionCount();
+      // serialize time column
+      for (int i = 0; i < currentCount; i++) {
+        // use columnOutput to write byte array
+        dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
+      }
+
+      // serialize each value column and its bitmap
+      for (int k = 0; k < columnNum; k++) {
+        // get DataOutputStream for current value column and its bitmap
+        DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
+        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 
1)];
+
+        Column column = tsBlock.getColumn(k);
+        TSDataType type = column.getDataType();
+        switch (type) {
+          case INT32:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeInt(column.getInt(i));
+                valueOccupation[k] += 4;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case INT64:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeLong(column.getLong(i));
+                valueOccupation[k] += 8;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case FLOAT:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeFloat(column.getFloat(i));
+                valueOccupation[k] += 4;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case DOUBLE:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeDouble(column.getDouble(i));
+                valueOccupation[k] += 8;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case BOOLEAN:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeBoolean(column.getBoolean(i));
+                valueOccupation[k] += 1;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case TEXT:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                Binary binary = column.getBinary(i);
+                dataOutputStream.writeInt(binary.getLength());
+                dataOutputStream.write(binary.getValues());
+                valueOccupation[k] = valueOccupation[k] + 4 + 
binary.getLength();
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                String.format("Data type %s is not supported.", type));
+        }
+        if (k != columnNum - 1) {
+          rowCount -= currentCount;
+        }
+      }
+    }
+    // feed the remaining bitmap
+    int remaining = rowCount % 8;
+    for (int k = 0; k < columnNum; k++) {
+      if (remaining != 0) {
+        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 
1)];
+        dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining));
+      }
+    }
+
+    // calculate the time buffer size
+    int timeOccupation = rowCount * 8;
+    ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+    timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
+    timeBuffer.flip();
+    tsQueryDataSet.setTime(timeBuffer);
+
+    // calculate the bitmap buffer size
+    int bitmapOccupation = (rowCount + 7) / 8;
+
+    List<ByteBuffer> bitmapList = new LinkedList<>();
+    List<ByteBuffer> valueList = new LinkedList<>();
+    for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
+      ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 
2]);
+      valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
+      valueBuffer.flip();
+      valueList.add(valueBuffer);
+
+      ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+      bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
+      bitmapBuffer.flip();
+      bitmapList.add(bitmapBuffer);
+    }
+    tsQueryDataSet.setBitmapList(bitmapList);
+    tsQueryDataSet.setValueList(valueList);
+    return tsQueryDataSet;
+  }
+
   /** pair.left is serialized TsBlock pair.right indicates if the query 
finished */
   // To fetch required amounts of data and combine them through List
   public static Pair<List<ByteBuffer>, Boolean> convertQueryResultByFetchSize(
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
index cda1762e917..b42f5e7ca3b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
@@ -253,7 +253,7 @@ public class TimeFilter {
 
     @Override
     public boolean satisfyStartEndTime(long startTime, long endTime) {
-      return !(startTime < this.startTime || endTime >= this.endTime);
+      return !(endTime < this.startTime || startTime >= this.endTime);
     }
 
     @Override

Reply via email to