This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch tsbs/iot
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/tsbs/iot by this push:
     new 77c8ae3a674 support Q1-Q11
77c8ae3a674 is described below

commit 77c8ae3a674cd6e2af018b67407839aa2da89561
Author: JackieTien97 <[email protected]>
AuthorDate: Sun Apr 28 19:09:32 2024 +0800

    support Q1-Q11
---
 .../protocol/thrift/impl/ClientRPCServiceImpl.java | 1201 ++++++++++++++++++++
 .../queryengine/plan/statement/StatementType.java  |   10 +
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  625 ++++++++++
 .../thrift-datanode/src/main/thrift/client.thrift  |   97 ++
 4 files changed, 1933 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index fb8fc40ba46..320e8fc7bfb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -121,7 +121,12 @@ import 
org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSAvgDailyDrivingDurationReq;
+import org.apache.iotdb.service.rpc.thrift.TSAvgDailyDrivingSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSAvgLoadReq;
+import org.apache.iotdb.service.rpc.thrift.TSAvgVsProjectedFuelConsumptionReq;
 import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
+import org.apache.iotdb.service.rpc.thrift.TSBreakdownFrequencyReq;
 import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
 import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
 import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
@@ -130,6 +135,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSDailyActivityReq;
 import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
 import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
@@ -142,6 +148,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
 import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
 import org.apache.iotdb.service.rpc.thrift.TSGroupByQueryIntervalReq;
+import org.apache.iotdb.service.rpc.thrift.TSHighLoadReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -151,6 +158,8 @@ import 
org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
 import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
+import org.apache.iotdb.service.rpc.thrift.TSLongDailySessionsReq;
+import org.apache.iotdb.service.rpc.thrift.TSLongDrivingSessionsReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
@@ -165,6 +174,8 @@ import 
org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
 import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Bytes;
 import io.airlift.units.Duration;
 import io.jsonwebtoken.lang.Strings;
 import org.apache.commons.lang3.StringUtils;
@@ -191,6 +202,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -202,6 +214,13 @@ import static 
org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
 import static 
org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementType.AVG_DAILY_DRIVING_DURATION;
+import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementType.AVG_DAILY_DRIVING_SESSION;
+import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementType.AVG_LOAD;
+import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementType.AVG_VS_PROJECTED_FUEL_CONSUMPTION;
+import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementType.DAILY_ACTIVITY;
+import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementType.HIGH_LOAD;
+import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementType.LONG_DRIVING_SESSIONS;
 import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
 import static 
org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException;
@@ -238,6 +257,258 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
   private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
       TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
 
+  private static final String NAME_COLUMN_NAME = "name";
+
+  private static final String DRIVER_COLUMN_NAME = "DRIVER";
+
+  private static final String CURRENT_LOAD_COLUMN_NAME = "current_load";
+
+  private static final String LOAD_CAPACITY_COLUMN_NAME = "load_capacity";
+
+  private static final String VELOCITY_COLUMN_NAME = "velocity";
+
+  private static final String FUEL_CONSUMPTION_COLUMN_NAME = 
"fuel_consumption";
+
+  private static final String NOMINAL_FUEL_CONSUMPTION_COLUMN_NAME = 
"nominal_fuel_consumption";
+
+  private static final String FLEET_COLUMN_NAME = "fleet";
+
+  private static final String MODEL_COLUMN_NAME = "model";
+
+  private static final String STATUS_COLUMN_NAME = "status";
+
+  private static final String HIGH_LOAD_SQL_TEMPLATE =
+      "select last" + CURRENT_LOAD_COLUMN_NAME + " from 
root.diagnostics.%s.**";
+
+  private static final List<String> HIGH_LOAD_HEADERS =
+      ImmutableList.of(
+          NAME_COLUMN_NAME,
+          DRIVER_COLUMN_NAME,
+          CURRENT_LOAD_COLUMN_NAME,
+          LOAD_CAPACITY_COLUMN_NAME);
+
+  private static final List<String> HIGH_LOAD_HEADER_DATA_TYPES =
+      ImmutableList.of(
+          TSDataType.TEXT.toString(),
+          TSDataType.TEXT.toString(),
+          TSDataType.DOUBLE.toString(),
+          TSDataType.DOUBLE.toString());
+
+  public static final List<TSDataType> HIGH_LOAD_DATA_TYPES =
+      ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.DOUBLE, 
TSDataType.DOUBLE);
+
+  private static final Map<String, Integer> HIGH_LOAD_COLUMN_NAME_INDEX_MAP = 
new HashMap<>();
+
+  static {
+    for (int i = 0; i < HIGH_LOAD_HEADERS.size(); i++) {
+      HIGH_LOAD_COLUMN_NAME_INDEX_MAP.put(HIGH_LOAD_HEADERS.get(i), i);
+    }
+  }
+
+  private static final List<Byte> HIGH_LOAD_ALIAS_COLUMNS =
+      new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+
+  private static final String LONG_DRIVING_SESSIONS_SQL_TEMPLATE =
+      "select avg("
+          + VELOCITY_COLUMN_NAME
+          + ") from root.readings.%s.** GROUP BY([%d, %d), 10m) ALIGN BY 
DEVICE HAVING avg("
+          + VELOCITY_COLUMN_NAME
+          + ") > 1";
+
+  private static final List<String> LONG_DRIVING_SESSIONS_HEADERS =
+      ImmutableList.of(NAME_COLUMN_NAME, DRIVER_COLUMN_NAME);
+
+  private static final List<String> LONG_DRIVING_SESSIONS_HEADER_DATA_TYPES =
+      ImmutableList.of(TSDataType.TEXT.toString(), TSDataType.TEXT.toString());
+
+  public static final List<TSDataType> LONG_DRIVING_SESSIONS_DATA_TYPES =
+      ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT);
+
+  private static final Map<String, Integer> 
LONG_DRIVING_SESSIONS_COLUMN_NAME_INDEX_MAP =
+      new HashMap<>();
+
+  static {
+    for (int i = 0; i < LONG_DRIVING_SESSIONS_HEADERS.size(); i++) {
+      
LONG_DRIVING_SESSIONS_COLUMN_NAME_INDEX_MAP.put(LONG_DRIVING_SESSIONS_HEADERS.get(i),
 i);
+    }
+  }
+
+  private static final List<Byte> LONG_DRIVING_SESSIONS_ALIAS_COLUMNS =
+      new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+
+  private static final String AVG_VS_PROJ_FUEL_CONSUMPTION_SQL_TEMPLATE =
+      "select sum("
+          + FUEL_CONSUMPTION_COLUMN_NAME
+          + "), count("
+          + FUEL_CONSUMPTION_COLUMN_NAME
+          + ") from root.readings.** where "
+          + VELOCITY_COLUMN_NAME
+          + " > 1 ALIGN BY DEVICE";
+
+  private static final List<String> AVG_VS_PROJ_FUEL_CONSUMPTION_HEADERS =
+      ImmutableList.of(
+          FLEET_COLUMN_NAME,
+          "avg_" + FUEL_CONSUMPTION_COLUMN_NAME,
+          NOMINAL_FUEL_CONSUMPTION_COLUMN_NAME);
+
+  private static final List<String> 
AVG_VS_PROJ_FUEL_CONSUMPTION_HEADER_DATA_TYPES =
+      ImmutableList.of(
+          TSDataType.TEXT.toString(), TSDataType.DOUBLE.toString(), 
TSDataType.DOUBLE.toString());
+
+  public static final List<TSDataType> AVG_VS_PROJ_FUEL_CONSUMPTION_DATA_TYPES 
=
+      ImmutableList.of(TSDataType.TEXT, TSDataType.DOUBLE, TSDataType.DOUBLE);
+
+  private static final Map<String, Integer> 
AVG_VS_PROJ_FUEL_CONSUMPTION_COLUMN_NAME_INDEX_MAP =
+      new HashMap<>();
+
+  static {
+    for (int i = 0; i < AVG_VS_PROJ_FUEL_CONSUMPTION_HEADERS.size(); i++) {
+      AVG_VS_PROJ_FUEL_CONSUMPTION_COLUMN_NAME_INDEX_MAP.put(
+          AVG_VS_PROJ_FUEL_CONSUMPTION_HEADERS.get(i), i);
+    }
+  }
+
+  private static final List<Byte> AVG_VS_PROJ_FUEL_CONSUMPTION_ALIAS_COLUMNS =
+      new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+
+  private static final String AVG_DAILY_DRIVING_DURATION_SQL_TEMPLATE =
+      "select avg("
+          + VELOCITY_COLUMN_NAME
+          + ") from root.readings.** GROUP BY([%d, %d), 10m) HAVING avg("
+          + VELOCITY_COLUMN_NAME
+          + ") > 1 ALIGN BY DEVICE";
+
+  private static final List<String> AVG_DAILY_DRIVING_DURATION_HEADERS =
+      ImmutableList.of(FLEET_COLUMN_NAME, NAME_COLUMN_NAME, 
DRIVER_COLUMN_NAME, "avg_daily_hours");
+
+  private static final List<String> 
AVG_DAILY_DRIVING_DURATION_HEADER_DATA_TYPES =
+      ImmutableList.of(
+          TSDataType.TEXT.toString(),
+          TSDataType.TEXT.toString(),
+          TSDataType.TEXT.toString(),
+          TSDataType.DOUBLE.toString());
+
+  public static final List<TSDataType> AVG_DAILY_DRIVING_DURATION_DATA_TYPES =
+      ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT, 
TSDataType.DOUBLE);
+
+  private static final Map<String, Integer> 
AVG_DAILY_DRIVING_DURATIONN_COLUMN_NAME_INDEX_MAP =
+      new HashMap<>();
+
+  static {
+    for (int i = 0; i < AVG_DAILY_DRIVING_DURATION_HEADERS.size(); i++) {
+      AVG_DAILY_DRIVING_DURATIONN_COLUMN_NAME_INDEX_MAP.put(
+          AVG_DAILY_DRIVING_DURATION_HEADERS.get(i), i);
+    }
+  }
+
+  private static final List<Byte> AVG_DAILY_DRIVING_DURATIONN_ALIAS_COLUMNS =
+      new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+
+  private static final String AVG_DAILY_DRIVING_SESSION_SQL_TEMPLATE =
+      "select avg("
+          + VELOCITY_COLUMN_NAME
+          + ") from root.readings.** GROUP BY([%d, %d), 10m) ALIGN BY DEVICE";
+
+  private static final List<String> AVG_DAILY_DRIVING_SESSION_HEADERS =
+      ImmutableList.of(NAME_COLUMN_NAME, "avg_daily_driving_duration");
+
+  private static final List<String> 
AVG_DAILY_DRIVING_SESSION_HEADER_DATA_TYPES =
+      ImmutableList.of(TSDataType.TEXT.toString(), 
TSDataType.INT64.toString());
+
+  public static final List<TSDataType> AVG_DAILY_DRIVING_SESSION_DATA_TYPES =
+      ImmutableList.of(TSDataType.TEXT, TSDataType.INT64);
+
+  private static final Map<String, Integer> 
AVG_DAILY_DRIVING_SESSION_COLUMN_NAME_INDEX_MAP =
+      new HashMap<>();
+
+  static {
+    for (int i = 0; i < AVG_DAILY_DRIVING_SESSION_HEADERS.size(); i++) {
+      AVG_DAILY_DRIVING_SESSION_COLUMN_NAME_INDEX_MAP.put(
+          AVG_DAILY_DRIVING_SESSION_HEADERS.get(i), i);
+    }
+  }
+
+  private static final List<Byte> AVG_DAILY_DRIVING_SESSION_ALIAS_COLUMNS =
+      new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+
+  private static final String AVG_LOAD_SQL_TEMPLATE =
+      "select avg(" + CURRENT_LOAD_COLUMN_NAME + ") from root.diagnostics.** 
ALIGN BY DEVICE";
+
+  private static final List<String> AVG_LOAD_HEADERS =
+      ImmutableList.of(
+          FLEET_COLUMN_NAME, MODEL_COLUMN_NAME, LOAD_CAPACITY_COLUMN_NAME, 
"avg_load_percentage");
+
+  private static final List<String> AVG_LOAD_HEADER_DATA_TYPES =
+      ImmutableList.of(
+          TSDataType.TEXT.toString(),
+          TSDataType.TEXT.toString(),
+          TSDataType.TEXT.toString(),
+          TSDataType.DOUBLE.toString());
+
+  public static final List<TSDataType> AVG_LOAD_DATA_TYPES =
+      ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT, 
TSDataType.DOUBLE);
+
+  private static final Map<String, Integer> AVG_LOAD_COLUMN_NAME_INDEX_MAP = 
new HashMap<>();
+
+  static {
+    for (int i = 0; i < AVG_LOAD_HEADERS.size(); i++) {
+      AVG_LOAD_COLUMN_NAME_INDEX_MAP.put(AVG_LOAD_HEADERS.get(i), i);
+    }
+  }
+
+  private static final List<Byte> AVG_LOAD_ALIAS_COLUMNS =
+      new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+
+  private static final String DAILY_ACTIVITY_SQL_TEMPLATE =
+      "select avg("
+          + STATUS_COLUMN_NAME
+          + ") from root.diagnostics.** GROUP BY([%d, %d), 10m) HAVING avg("
+          + STATUS_COLUMN_NAME
+          + ") < 1 ALIGN BY DEVICE";
+
+  private static final List<String> DAILY_ACTIVITY_HEADERS =
+      ImmutableList.of(FLEET_COLUMN_NAME, MODEL_COLUMN_NAME, "daily_activity");
+
+  private static final List<String> DAILY_ACTIVITY_HEADER_DATA_TYPES =
+      ImmutableList.of(
+          TSDataType.TEXT.toString(), TSDataType.TEXT.toString(), 
TSDataType.INT64.toString());
+
+  public static final List<TSDataType> DAILY_ACTIVITY_DATA_TYPES =
+      ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.INT64);
+
+  private static final Map<String, Integer> 
DAILY_ACTIVITY_COLUMN_NAME_INDEX_MAP = new HashMap<>();
+
+  static {
+    for (int i = 0; i < DAILY_ACTIVITY_HEADERS.size(); i++) {
+      DAILY_ACTIVITY_COLUMN_NAME_INDEX_MAP.put(DAILY_ACTIVITY_HEADERS.get(i), 
i);
+    }
+  }
+
+  private static final List<Byte> DAILY_ACTIVITY_ALIAS_COLUMNS =
+      new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+
+  private static final Map<String, DeviceAttributes> DEVICE_ATTRIBUTES_MAP = 
new HashMap<>();
+
+  public static class DeviceAttributes {
+    public final double nominalFuelConsumption;
+
+    public final double loadCapacity;
+
+    public final double fuelCapacity;
+
+    public DeviceAttributes(
+        double nominalFuelConsumption, double loadCapacity, double 
fuelCapacity) {
+      this.nominalFuelConsumption = nominalFuelConsumption;
+      this.loadCapacity = loadCapacity;
+      this.fuelCapacity = fuelCapacity;
+    }
+  }
+
+  public static final int FLEET_LEVEL = 2;
+  public static final int MODEL_LEVEL = 3;
+  public static final int NAME_LEVEL = 4;
+  public static final int DRIVER_LEVEL = 5;
+
   @FunctionalInterface
   public interface SelectResult {
     boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, 
int fetchSize)
@@ -2184,6 +2455,936 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     return executeAggregationQueryInternal(req, OLD_SELECT_RESULT);
   }
 
+  @Override
+  public TSExecuteStatementResp highLoad(TSHighLoadReq req) throws TException {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    String statement = String.format(HIGH_LOAD_SQL_TEMPLATE, req.fleet);
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    // quota
+    OperationQuota quota = null;
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    StatementType statementType = null;
+    Throwable t = null;
+    try {
+      Statement s = StatementGenerator.createStatement(statement, 
clientSession.getZoneId());
+
+      if (s == null) {
+        return RpcUtils.getTSExecuteStatementResp(
+            RpcUtils.getStatus(
+                TSStatusCode.SQL_PARSE_ERROR, "This operation type is not 
supported"));
+      }
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      quota =
+          DataNodeThrottleQuotaManager.getInstance()
+              .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+      statementType = s.getType();
+      if (ENABLE_AUDIT_LOG) {
+        AuditLogger.log(statement, s);
+      }
+
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.executeForTreeModel(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              statement,
+              partitionFetcher,
+              schemaFetcher,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = 
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+          resp.setColumnNameIndexMap(HIGH_LOAD_COLUMN_NAME_INDEX_MAP);
+          resp.setSgColumns(Collections.emptyList());
+          resp.setColumns(HIGH_LOAD_HEADERS);
+          resp.setDataTypeList(HIGH_LOAD_HEADER_DATA_TYPES);
+          resp.setAliasColumns(HIGH_LOAD_ALIAS_COLUMNS);
+          resp.setIgnoreTimeStamp(false);
+          resp.setQueryId(queryId);
+          resp.setStatus(result.status);
+          Pair<List<ByteBuffer>, Boolean> pair =
+              QueryDataSetUtils.constructHighLoadResult(
+                  queryExecution, DEVICE_ATTRIBUTES_MAP, serde);
+          finished = pair.right;
+          resp.setQueryResult(pair.left);
+          resp.setMoreData(!finished);
+          quota.addReadResult(resp.getQueryResult());
+        } else {
+          finished = true;
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + statement + "\". " + 
OperationType.EXECUTE_STATEMENT));
+    } catch (Error error) {
+      t = error;
+      throw error;
+    } finally {
+      long currentOperationCost = System.nanoTime() - startTime;
+      COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+      // record each operation time cost
+      if (statementType != null) {
+        addStatementExecutionLatency(
+            OperationType.EXECUTE_QUERY_STATEMENT, HIGH_LOAD.name(), 
currentOperationCost);
+      }
+
+      if (finished) {
+        // record total time cost for one query
+        long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+        addQueryLatency(HIGH_LOAD, executionTime > 0 ? executionTime : 
currentOperationCost);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
+      }
+      SESSION_MANAGER.updateIdleTime();
+      if (quota != null) {
+        quota.close();
+      }
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp longDrivingSessions(TSLongDrivingSessionsReq 
req)
+      throws TException {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    String statement =
+        String.format(LONG_DRIVING_SESSIONS_SQL_TEMPLATE, req.fleet, 
req.startTime, req.endTime);
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    // quota
+    OperationQuota quota = null;
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    StatementType statementType = null;
+    Throwable t = null;
+    try {
+      Statement s = StatementGenerator.createStatement(statement, 
clientSession.getZoneId());
+
+      if (s == null) {
+        return RpcUtils.getTSExecuteStatementResp(
+            RpcUtils.getStatus(
+                TSStatusCode.SQL_PARSE_ERROR, "This operation type is not 
supported"));
+      }
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      quota =
+          DataNodeThrottleQuotaManager.getInstance()
+              .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+      statementType = s.getType();
+      if (ENABLE_AUDIT_LOG) {
+        AuditLogger.log(statement, s);
+      }
+
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.executeForTreeModel(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              statement,
+              partitionFetcher,
+              schemaFetcher,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = 
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+          
resp.setColumnNameIndexMap(LONG_DRIVING_SESSIONS_COLUMN_NAME_INDEX_MAP);
+          resp.setSgColumns(Collections.emptyList());
+          resp.setColumns(LONG_DRIVING_SESSIONS_HEADERS);
+          resp.setDataTypeList(LONG_DRIVING_SESSIONS_HEADER_DATA_TYPES);
+          resp.setAliasColumns(LONG_DRIVING_SESSIONS_ALIAS_COLUMNS);
+          resp.setIgnoreTimeStamp(true);
+          resp.setQueryId(queryId);
+          resp.setStatus(result.status);
+          Pair<List<ByteBuffer>, Boolean> pair =
+              
QueryDataSetUtils.constructLongDrivingSessionsResult(queryExecution, serde, 22);
+          finished = pair.right;
+          resp.setQueryResult(pair.left);
+          resp.setMoreData(!finished);
+          quota.addReadResult(resp.getQueryResult());
+        } else {
+          finished = true;
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + statement + "\". " + 
OperationType.EXECUTE_STATEMENT));
+    } catch (Error error) {
+      t = error;
+      throw error;
+    } finally {
+      long currentOperationCost = System.nanoTime() - startTime;
+      COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+      // record each operation time cost
+      if (statementType != null) {
+        addStatementExecutionLatency(
+            OperationType.EXECUTE_QUERY_STATEMENT,
+            LONG_DRIVING_SESSIONS.name(),
+            currentOperationCost);
+      }
+
+      if (finished) {
+        // record total time cost for one query
+        long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+        addQueryLatency(
+            LONG_DRIVING_SESSIONS, executionTime > 0 ? executionTime : 
currentOperationCost);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
+      }
+      SESSION_MANAGER.updateIdleTime();
+      if (quota != null) {
+        quota.close();
+      }
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp longDailySessions(TSLongDailySessionsReq req) 
throws TException {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    String statement =
+        String.format(LONG_DRIVING_SESSIONS_SQL_TEMPLATE, req.fleet, 
req.startTime, req.endTime);
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    // quota
+    OperationQuota quota = null;
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    StatementType statementType = null;
+    Throwable t = null;
+    try {
+      Statement s = StatementGenerator.createStatement(statement, 
clientSession.getZoneId());
+
+      if (s == null) {
+        return RpcUtils.getTSExecuteStatementResp(
+            RpcUtils.getStatus(
+                TSStatusCode.SQL_PARSE_ERROR, "This operation type is not 
supported"));
+      }
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      quota =
+          DataNodeThrottleQuotaManager.getInstance()
+              .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+      statementType = s.getType();
+      if (ENABLE_AUDIT_LOG) {
+        AuditLogger.log(statement, s);
+      }
+
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.executeForTreeModel(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              statement,
+              partitionFetcher,
+              schemaFetcher,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = 
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+          
resp.setColumnNameIndexMap(LONG_DRIVING_SESSIONS_COLUMN_NAME_INDEX_MAP);
+          resp.setSgColumns(Collections.emptyList());
+          resp.setColumns(LONG_DRIVING_SESSIONS_HEADERS);
+          resp.setDataTypeList(LONG_DRIVING_SESSIONS_HEADER_DATA_TYPES);
+          resp.setAliasColumns(LONG_DRIVING_SESSIONS_ALIAS_COLUMNS);
+          resp.setIgnoreTimeStamp(true);
+          resp.setQueryId(queryId);
+          resp.setStatus(result.status);
+          Pair<List<ByteBuffer>, Boolean> pair =
+              
QueryDataSetUtils.constructLongDrivingSessionsResult(queryExecution, serde, 60);
+          finished = pair.right;
+          resp.setQueryResult(pair.left);
+          resp.setMoreData(!finished);
+          quota.addReadResult(resp.getQueryResult());
+        } else {
+          finished = true;
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + statement + "\". " + 
OperationType.EXECUTE_STATEMENT));
+    } catch (Error error) {
+      t = error;
+      throw error;
+    } finally {
+      long currentOperationCost = System.nanoTime() - startTime;
+      COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+      // record each operation time cost
+      if (statementType != null) {
+        addStatementExecutionLatency(
+            OperationType.EXECUTE_QUERY_STATEMENT,
+            LONG_DRIVING_SESSIONS.name(),
+            currentOperationCost);
+      }
+
+      if (finished) {
+        // record total time cost for one query
+        long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+        addQueryLatency(
+            LONG_DRIVING_SESSIONS, executionTime > 0 ? executionTime : 
currentOperationCost);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
+      }
+      SESSION_MANAGER.updateIdleTime();
+      if (quota != null) {
+        quota.close();
+      }
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp avgVsProjectedFuelConsumption(
+      TSAvgVsProjectedFuelConsumptionReq req) throws TException {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    String statement = AVG_VS_PROJ_FUEL_CONSUMPTION_SQL_TEMPLATE;
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    // quota
+    OperationQuota quota = null;
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    StatementType statementType = null;
+    Throwable t = null;
+    try {
+      Statement s = StatementGenerator.createStatement(statement, 
clientSession.getZoneId());
+
+      if (s == null) {
+        return RpcUtils.getTSExecuteStatementResp(
+            RpcUtils.getStatus(
+                TSStatusCode.SQL_PARSE_ERROR, "This operation type is not 
supported"));
+      }
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      quota =
+          DataNodeThrottleQuotaManager.getInstance()
+              .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+      statementType = s.getType();
+      if (ENABLE_AUDIT_LOG) {
+        AuditLogger.log(statement, s);
+      }
+
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.executeForTreeModel(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              statement,
+              partitionFetcher,
+              schemaFetcher,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = 
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+          
resp.setColumnNameIndexMap(AVG_VS_PROJ_FUEL_CONSUMPTION_COLUMN_NAME_INDEX_MAP);
+          resp.setSgColumns(Collections.emptyList());
+          resp.setColumns(AVG_VS_PROJ_FUEL_CONSUMPTION_HEADERS);
+          resp.setDataTypeList(AVG_VS_PROJ_FUEL_CONSUMPTION_HEADER_DATA_TYPES);
+          resp.setAliasColumns(AVG_VS_PROJ_FUEL_CONSUMPTION_ALIAS_COLUMNS);
+          resp.setIgnoreTimeStamp(true);
+          resp.setQueryId(queryId);
+          resp.setStatus(result.status);
+          Pair<List<ByteBuffer>, Boolean> pair =
+              QueryDataSetUtils.constructAvgVsProjectedFuelConsumptionResult(
+                  queryExecution, DEVICE_ATTRIBUTES_MAP, serde);
+          finished = pair.right;
+          resp.setQueryResult(pair.left);
+          resp.setMoreData(!finished);
+          quota.addReadResult(resp.getQueryResult());
+        } else {
+          finished = true;
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + statement + "\". " + 
OperationType.EXECUTE_STATEMENT));
+    } catch (Error error) {
+      t = error;
+      throw error;
+    } finally {
+      long currentOperationCost = System.nanoTime() - startTime;
+      COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+      // record each operation time cost
+      if (statementType != null) {
+        addStatementExecutionLatency(
+            OperationType.EXECUTE_QUERY_STATEMENT,
+            AVG_VS_PROJECTED_FUEL_CONSUMPTION.name(),
+            currentOperationCost);
+      }
+
+      if (finished) {
+        // record total time cost for one query
+        long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+        addQueryLatency(
+            AVG_VS_PROJECTED_FUEL_CONSUMPTION,
+            executionTime > 0 ? executionTime : currentOperationCost);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
+      }
+      SESSION_MANAGER.updateIdleTime();
+      if (quota != null) {
+        quota.close();
+      }
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp 
avgDailyDrivingDuration(TSAvgDailyDrivingDurationReq req)
+      throws TException {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    String statement =
+        String.format(AVG_DAILY_DRIVING_DURATION_SQL_TEMPLATE, req.startTime, 
req.endTime);
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    // quota
+    OperationQuota quota = null;
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    StatementType statementType = null;
+    Throwable t = null;
+    try {
+      Statement s = StatementGenerator.createStatement(statement, 
clientSession.getZoneId());
+
+      if (s == null) {
+        return RpcUtils.getTSExecuteStatementResp(
+            RpcUtils.getStatus(
+                TSStatusCode.SQL_PARSE_ERROR, "This operation type is not 
supported"));
+      }
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      quota =
+          DataNodeThrottleQuotaManager.getInstance()
+              .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+      statementType = s.getType();
+      if (ENABLE_AUDIT_LOG) {
+        AuditLogger.log(statement, s);
+      }
+
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.executeForTreeModel(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              statement,
+              partitionFetcher,
+              schemaFetcher,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = 
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+          
resp.setColumnNameIndexMap(AVG_DAILY_DRIVING_DURATIONN_COLUMN_NAME_INDEX_MAP);
+          resp.setSgColumns(Collections.emptyList());
+          resp.setColumns(AVG_DAILY_DRIVING_DURATION_HEADERS);
+          resp.setDataTypeList(AVG_DAILY_DRIVING_DURATION_HEADER_DATA_TYPES);
+          resp.setAliasColumns(AVG_DAILY_DRIVING_DURATIONN_ALIAS_COLUMNS);
+          resp.setIgnoreTimeStamp(false);
+          resp.setQueryId(queryId);
+          resp.setStatus(result.status);
+          Pair<List<ByteBuffer>, Boolean> pair =
+              QueryDataSetUtils.constructAvgDailyDrivingDurationResult(
+                  queryExecution, serde, req.startTime, req.endTime);
+          finished = pair.right;
+          resp.setQueryResult(pair.left);
+          resp.setMoreData(!finished);
+          quota.addReadResult(resp.getQueryResult());
+        } else {
+          finished = true;
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + statement + "\". " + 
OperationType.EXECUTE_STATEMENT));
+    } catch (Error error) {
+      t = error;
+      throw error;
+    } finally {
+      long currentOperationCost = System.nanoTime() - startTime;
+      COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+      // record each operation time cost
+      if (statementType != null) {
+        addStatementExecutionLatency(
+            OperationType.EXECUTE_QUERY_STATEMENT,
+            AVG_DAILY_DRIVING_DURATION.name(),
+            currentOperationCost);
+      }
+
+      if (finished) {
+        // record total time cost for one query
+        long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+        addQueryLatency(
+            AVG_DAILY_DRIVING_DURATION, executionTime > 0 ? executionTime : 
currentOperationCost);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
+      }
+      SESSION_MANAGER.updateIdleTime();
+      if (quota != null) {
+        quota.close();
+      }
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp 
avgDailyDrivingSession(TSAvgDailyDrivingSessionReq req)
+      throws TException {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    String statement =
+        String.format(AVG_DAILY_DRIVING_SESSION_SQL_TEMPLATE, req.startTime, 
req.endTime);
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    // quota
+    OperationQuota quota = null;
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    StatementType statementType = null;
+    Throwable t = null;
+    try {
+      Statement s = StatementGenerator.createStatement(statement, 
clientSession.getZoneId());
+
+      if (s == null) {
+        return RpcUtils.getTSExecuteStatementResp(
+            RpcUtils.getStatus(
+                TSStatusCode.SQL_PARSE_ERROR, "This operation type is not 
supported"));
+      }
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      quota =
+          DataNodeThrottleQuotaManager.getInstance()
+              .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+      statementType = s.getType();
+      if (ENABLE_AUDIT_LOG) {
+        AuditLogger.log(statement, s);
+      }
+
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.executeForTreeModel(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              statement,
+              partitionFetcher,
+              schemaFetcher,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = 
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+          
resp.setColumnNameIndexMap(AVG_DAILY_DRIVING_SESSION_COLUMN_NAME_INDEX_MAP);
+          resp.setSgColumns(Collections.emptyList());
+          resp.setColumns(AVG_DAILY_DRIVING_SESSION_HEADERS);
+          resp.setDataTypeList(AVG_DAILY_DRIVING_SESSION_HEADER_DATA_TYPES);
+          resp.setAliasColumns(AVG_DAILY_DRIVING_SESSION_ALIAS_COLUMNS);
+          resp.setIgnoreTimeStamp(false);
+          resp.setQueryId(queryId);
+          resp.setStatus(result.status);
+          Pair<List<ByteBuffer>, Boolean> pair =
+              QueryDataSetUtils.constructAvgDailyDrivingSessionResult(
+                  queryExecution, serde, req.startTime, req.endTime);
+          finished = pair.right;
+          resp.setQueryResult(pair.left);
+          resp.setMoreData(!finished);
+          quota.addReadResult(resp.getQueryResult());
+        } else {
+          finished = true;
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + statement + "\". " + 
OperationType.EXECUTE_STATEMENT));
+    } catch (Error error) {
+      t = error;
+      throw error;
+    } finally {
+      long currentOperationCost = System.nanoTime() - startTime;
+      COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+      // record each operation time cost
+      if (statementType != null) {
+        addStatementExecutionLatency(
+            OperationType.EXECUTE_QUERY_STATEMENT,
+            AVG_DAILY_DRIVING_SESSION.name(),
+            currentOperationCost);
+      }
+
+      if (finished) {
+        // record total time cost for one query
+        long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+        addQueryLatency(
+            AVG_DAILY_DRIVING_SESSION, executionTime > 0 ? executionTime : 
currentOperationCost);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
+      }
+      SESSION_MANAGER.updateIdleTime();
+      if (quota != null) {
+        quota.close();
+      }
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp avgLoad(TSAvgLoadReq req) throws TException {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    String statement = AVG_LOAD_SQL_TEMPLATE;
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    // quota
+    OperationQuota quota = null;
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    StatementType statementType = null;
+    Throwable t = null;
+    try {
+      Statement s = StatementGenerator.createStatement(statement, 
clientSession.getZoneId());
+
+      if (s == null) {
+        return RpcUtils.getTSExecuteStatementResp(
+            RpcUtils.getStatus(
+                TSStatusCode.SQL_PARSE_ERROR, "This operation type is not 
supported"));
+      }
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      quota =
+          DataNodeThrottleQuotaManager.getInstance()
+              .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+      statementType = s.getType();
+      if (ENABLE_AUDIT_LOG) {
+        AuditLogger.log(statement, s);
+      }
+
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.executeForTreeModel(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              statement,
+              partitionFetcher,
+              schemaFetcher,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = 
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+          resp.setColumnNameIndexMap(AVG_LOAD_COLUMN_NAME_INDEX_MAP);
+          resp.setSgColumns(Collections.emptyList());
+          resp.setColumns(AVG_LOAD_HEADERS);
+          resp.setDataTypeList(AVG_LOAD_HEADER_DATA_TYPES);
+          resp.setAliasColumns(AVG_LOAD_ALIAS_COLUMNS);
+          resp.setIgnoreTimeStamp(true);
+          resp.setQueryId(queryId);
+          resp.setStatus(result.status);
+          Pair<List<ByteBuffer>, Boolean> pair =
+              QueryDataSetUtils.constructAvgLoadResult(
+                  queryExecution, DEVICE_ATTRIBUTES_MAP, serde);
+          finished = pair.right;
+          resp.setQueryResult(pair.left);
+          resp.setMoreData(!finished);
+          quota.addReadResult(resp.getQueryResult());
+        } else {
+          finished = true;
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + statement + "\". " + 
OperationType.EXECUTE_STATEMENT));
+    } catch (Error error) {
+      t = error;
+      throw error;
+    } finally {
+      long currentOperationCost = System.nanoTime() - startTime;
+      COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+      // record each operation time cost
+      if (statementType != null) {
+        addStatementExecutionLatency(
+            OperationType.EXECUTE_QUERY_STATEMENT, AVG_LOAD.name(), 
currentOperationCost);
+      }
+
+      if (finished) {
+        // record total time cost for one query
+        long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+        addQueryLatency(AVG_LOAD, executionTime > 0 ? executionTime : 
currentOperationCost);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
+      }
+      SESSION_MANAGER.updateIdleTime();
+      if (quota != null) {
+        quota.close();
+      }
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp dailyActivity(TSDailyActivityReq req) throws 
TException {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    String statement = String.format(DAILY_ACTIVITY_SQL_TEMPLATE, 
req.startTime, req.endTime);
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    // quota
+    OperationQuota quota = null;
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    StatementType statementType = null;
+    Throwable t = null;
+    try {
+      Statement s = StatementGenerator.createStatement(statement, 
clientSession.getZoneId());
+
+      if (s == null) {
+        return RpcUtils.getTSExecuteStatementResp(
+            RpcUtils.getStatus(
+                TSStatusCode.SQL_PARSE_ERROR, "This operation type is not 
supported"));
+      }
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      quota =
+          DataNodeThrottleQuotaManager.getInstance()
+              .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+      statementType = s.getType();
+      if (ENABLE_AUDIT_LOG) {
+        AuditLogger.log(statement, s);
+      }
+
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.executeForTreeModel(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              statement,
+              partitionFetcher,
+              schemaFetcher,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = 
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+          resp.setColumnNameIndexMap(DAILY_ACTIVITY_COLUMN_NAME_INDEX_MAP);
+          resp.setSgColumns(Collections.emptyList());
+          resp.setColumns(DAILY_ACTIVITY_HEADERS);
+          resp.setDataTypeList(DAILY_ACTIVITY_HEADER_DATA_TYPES);
+          resp.setAliasColumns(DAILY_ACTIVITY_ALIAS_COLUMNS);
+          resp.setIgnoreTimeStamp(false);
+          resp.setQueryId(queryId);
+          resp.setStatus(result.status);
+          Pair<List<ByteBuffer>, Boolean> pair =
+              QueryDataSetUtils.constructDailyActivityResult(queryExecution, 
serde);
+          finished = pair.right;
+          resp.setQueryResult(pair.left);
+          resp.setMoreData(!finished);
+          quota.addReadResult(resp.getQueryResult());
+        } else {
+          finished = true;
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + statement + "\". " + 
OperationType.EXECUTE_STATEMENT));
+    } catch (Error error) {
+      t = error;
+      throw error;
+    } finally {
+      long currentOperationCost = System.nanoTime() - startTime;
+      COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+      // record each operation time cost
+      if (statementType != null) {
+        addStatementExecutionLatency(
+            OperationType.EXECUTE_QUERY_STATEMENT, DAILY_ACTIVITY.name(), 
currentOperationCost);
+      }
+
+      if (finished) {
+        // record total time cost for one query
+        long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+        addQueryLatency(DAILY_ACTIVITY, executionTime > 0 ? executionTime : 
currentOperationCost);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
+      }
+      SESSION_MANAGER.updateIdleTime();
+      if (quota != null) {
+        quota.close();
+      }
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp breakdownFrequency(TSBreakdownFrequencyReq 
req) throws TException {
+    return null;
+  }
+
   @Override
   public long requestStatementId(long sessionId) {
     return 
SESSION_MANAGER.requestStatementId(SESSION_MANAGER.getCurrSession());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
index 88906ba83f4..817e427fb57 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
@@ -184,4 +184,14 @@ public enum StatementType {
   SHOW_TOPICS,
 
   SHOW_SUBSCRIPTIONS,
+
+  HIGH_LOAD,
+  LONG_DRIVING_SESSIONS,
+  LONG_DAILY_SESSIONS,
+  AVG_VS_PROJECTED_FUEL_CONSUMPTION,
+  AVG_DAILY_DRIVING_DURATION,
+  AVG_DAILY_DRIVING_SESSION,
+  AVG_LOAD,
+  DAILY_ACTIVITY,
+  BREAKDOWN_FREQUENCY,
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 365b7d85b2d..7dd872122f2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -20,16 +20,28 @@
 package org.apache.iotdb.db.utils;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl;
+import 
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.AggrWindowIterator;
 import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 
 import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.TimeRange;
 import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.tsfile.read.common.block.column.LongColumn;
+import org.apache.tsfile.read.common.block.column.TimeColumn;
+import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.tsfile.read.common.block.column.TsBlockSerde;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.BytesUtils;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.TimeDuration;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
 
 import java.io.ByteArrayOutputStream;
@@ -37,10 +49,29 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.AVG_DAILY_DRIVING_DURATION_DATA_TYPES;
+import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.AVG_DAILY_DRIVING_SESSION_DATA_TYPES;
+import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.AVG_LOAD_DATA_TYPES;
+import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.AVG_VS_PROJ_FUEL_CONSUMPTION_DATA_TYPES;
+import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.DAILY_ACTIVITY_DATA_TYPES;
+import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.DRIVER_LEVEL;
+import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.FLEET_LEVEL;
+import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.HIGH_LOAD_DATA_TYPES;
+import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.LONG_DRIVING_SESSIONS_DATA_TYPES;
+import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.MODEL_LEVEL;
+import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.NAME_LEVEL;
+import static 
org.apache.iotdb.db.utils.TimestampPrecisionUtils.convertToCurrPrecision;
 
 /** TimeValuePairUtils to convert between thrift format and TsFile format. */
 public class QueryDataSetUtils {
@@ -843,4 +874,598 @@ public class QueryDataSetUtils {
     }
     values[columnIndex] = binaryValues;
   }
+
+  public static Pair<List<ByteBuffer>, Boolean> constructHighLoadResult(
+      IQueryExecution queryExecution,
+      Map<String, ClientRPCServiceImpl.DeviceAttributes> deviceAttributesMap,
+      TsBlockSerde serde)
+      throws IoTDBException, IOException {
+    boolean finished;
+
+    TsBlockBuilder builder = new TsBlockBuilder(HIGH_LOAD_DATA_TYPES);
+    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+    ColumnBuilder nameColumnBuilder = builder.getColumnBuilder(0);
+    ColumnBuilder driverColumnBuilder = builder.getColumnBuilder(1);
+    ColumnBuilder currentLoadColumnBuilder = builder.getColumnBuilder(2);
+    ColumnBuilder loadCapacityColumnBuilder = builder.getColumnBuilder(3);
+
+    while (true) {
+      Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+      if (!optionalTsBlock.isPresent()) {
+        finished = true;
+        break;
+      }
+      TsBlock tsBlock = optionalTsBlock.get();
+
+      if (!tsBlock.isEmpty()) {
+        BinaryColumn timeSeriesColumn = (BinaryColumn) tsBlock.getColumn(0);
+        BinaryColumn valueColumn = (BinaryColumn) tsBlock.getColumn(1);
+        int currentCount = tsBlock.getPositionCount();
+        for (int i = 0; i < currentCount; i++) {
+          String timeSeries = 
timeSeriesColumn.getBinary(i).getStringValue(StandardCharsets.UTF_8);
+          String[] seriesArray = timeSeries.split("\\.");
+          String name = seriesArray[NAME_LEVEL];
+          String driver = seriesArray[DRIVER_LEVEL];
+          double value =
+              
Double.parseDouble(valueColumn.getBinary(i).getStringValue(StandardCharsets.UTF_8));
+          double loadCapacity =
+              deviceAttributesMap.get(timeSeries.substring(0, 
timeSeries.length() - 13))
+                  .loadCapacity;
+
+          if (value >= 0.9 * loadCapacity) {
+            builder.declarePosition();
+            // time column
+            timeColumnBuilder.writeLong(tsBlock.getTimeByIndex(i));
+
+            // name column
+            nameColumnBuilder.writeBinary(new Binary(name, 
StandardCharsets.UTF_8));
+
+            // driver column
+            driverColumnBuilder.writeBinary(new Binary(driver, 
StandardCharsets.UTF_8));
+
+            // current_load column
+            currentLoadColumnBuilder.writeDouble(value);
+
+            // load_capacity column
+            loadCapacityColumnBuilder.writeDouble(loadCapacity);
+          }
+        }
+      }
+    }
+
+    return new 
Pair<>(Collections.singletonList(serde.serialize(builder.build())), finished);
+  }
+
+  public static Pair<List<ByteBuffer>, Boolean> 
constructLongDrivingSessionsResult(
+      IQueryExecution queryExecution, TsBlockSerde serde, int threshold)
+      throws IoTDBException, IOException {
+    boolean finished;
+
+    TsBlockBuilder builder = new 
TsBlockBuilder(LONG_DRIVING_SESSIONS_DATA_TYPES);
+    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+    ColumnBuilder nameColumnBuilder = builder.getColumnBuilder(0);
+    ColumnBuilder driverColumnBuilder = builder.getColumnBuilder(1);
+
+    Binary previousDevice = new Binary("", StandardCharsets.UTF_8);
+    int count = 0;
+    while (true) {
+      Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+      if (!optionalTsBlock.isPresent()) {
+        finished = true;
+        break;
+      }
+      TsBlock tsBlock = optionalTsBlock.get();
+
+      if (!tsBlock.isEmpty()) {
+        BinaryColumn deviceColumn = (BinaryColumn) tsBlock.getColumn(0);
+        int currentCount = tsBlock.getPositionCount();
+        for (int i = 0; i < currentCount; i++) {
+          Binary deviceId = deviceColumn.getBinary(i);
+          // TODO ty optimize Binary equals method
+          if (previousDevice.equals(deviceId)) {
+            count++;
+          } else {
+            // device changed, we need to decide whether to output previous 
device
+            if (count > threshold) {
+              String[] seriesArray =
+                  
previousDevice.getStringValue(StandardCharsets.UTF_8).split("\\.");
+              String name = seriesArray[NAME_LEVEL];
+              String driver = seriesArray[DRIVER_LEVEL];
+              builder.declarePosition();
+              timeColumnBuilder.writeLong(0);
+              // name column
+              nameColumnBuilder.writeBinary(new Binary(name, 
StandardCharsets.UTF_8));
+              // driver column
+              driverColumnBuilder.writeBinary(new Binary(driver, 
StandardCharsets.UTF_8));
+            }
+            previousDevice = deviceId;
+            count = 1;
+          }
+        }
+      }
+    }
+
+    // last device, we need to decide whether to output previous device
+    if (count > 22) {
+      String[] seriesArray = 
previousDevice.getStringValue(StandardCharsets.UTF_8).split("\\.");
+      String name = seriesArray[NAME_LEVEL];
+      String driver = seriesArray[DRIVER_LEVEL];
+      builder.declarePosition();
+      timeColumnBuilder.writeLong(0);
+      // name column
+      nameColumnBuilder.writeBinary(new Binary(name, StandardCharsets.UTF_8));
+      // driver column
+      driverColumnBuilder.writeBinary(new Binary(driver, 
StandardCharsets.UTF_8));
+    }
+
+    return new 
Pair<>(Collections.singletonList(serde.serialize(builder.build())), finished);
+  }
+
+  public static Pair<List<ByteBuffer>, Boolean> 
constructAvgVsProjectedFuelConsumptionResult(
+      IQueryExecution queryExecution,
+      Map<String, ClientRPCServiceImpl.DeviceAttributes> deviceAttributesMap,
+      TsBlockSerde serde)
+      throws IoTDBException, IOException {
+    boolean finished;
+
+    Map<String, AvgIntermediateResult> map = new HashMap<>();
+    while (true) {
+      Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+      if (!optionalTsBlock.isPresent()) {
+        finished = true;
+        break;
+      }
+      TsBlock tsBlock = optionalTsBlock.get();
+
+      if (!tsBlock.isEmpty()) {
+        BinaryColumn deviceColumn = (BinaryColumn) tsBlock.getColumn(0);
+        DoubleColumn sumFuelColumn = (DoubleColumn) tsBlock.getColumn(1);
+        LongColumn countFuelColumn = (LongColumn) tsBlock.getColumn(2);
+
+        int currentCount = tsBlock.getPositionCount();
+        for (int i = 0; i < currentCount; i++) {
+          String deviceId = 
deviceColumn.getBinary(i).getStringValue(StandardCharsets.UTF_8);
+          double nominalFuelConsumption = 
deviceAttributesMap.get(deviceId).nominalFuelConsumption;
+          String[] seriesArray = deviceId.split("\\.");
+          String model = seriesArray[MODEL_LEVEL];
+          AvgIntermediateResult result =
+              map.computeIfAbsent(model, k -> new AvgIntermediateResult());
+          long count = countFuelColumn.getLong(i);
+          result.count += count;
+          result.fuelSum += sumFuelColumn.getDouble(i);
+          result.nominalFuelSum += (nominalFuelConsumption * count);
+        }
+      }
+    }
+
+    int size = map.size();
+    TsBlockBuilder builder = new TsBlockBuilder(size, 
AVG_VS_PROJ_FUEL_CONSUMPTION_DATA_TYPES);
+    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+    ColumnBuilder fuelColumnBuilder = builder.getColumnBuilder(0);
+    ColumnBuilder avgFuelConsumptionColumnBuilder = 
builder.getColumnBuilder(1);
+    ColumnBuilder nomimalFuelConsumptionColumnBuilder = 
builder.getColumnBuilder(2);
+
+    map.forEach(
+        (k, v) -> {
+          timeColumnBuilder.writeLong(0);
+          fuelColumnBuilder.writeBinary(new Binary(k, StandardCharsets.UTF_8));
+          avgFuelConsumptionColumnBuilder.writeDouble(v.fuelSum / v.count);
+          nomimalFuelConsumptionColumnBuilder.writeDouble(v.nominalFuelSum / 
v.count);
+        });
+
+    builder.declarePositions(size);
+
+    return new 
Pair<>(Collections.singletonList(serde.serialize(builder.build())), finished);
+  }
+
+  private static class AvgIntermediateResult {
+    long count = 0;
+    double fuelSum = 0.0d;
+
+    double nominalFuelSum = 0.0d;
+  }
+
+  public static Pair<List<ByteBuffer>, Boolean> 
constructAvgDailyDrivingDurationResult(
+      IQueryExecution queryExecution, TsBlockSerde serde, long startTime, long 
endTime)
+      throws IoTDBException, IOException {
+    boolean finished;
+
+    TsBlockBuilder builder = new 
TsBlockBuilder(AVG_DAILY_DRIVING_DURATION_DATA_TYPES);
+    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+    ColumnBuilder fleetColumnBuilder = builder.getColumnBuilder(0);
+    ColumnBuilder nameColumnBuilder = builder.getColumnBuilder(1);
+    ColumnBuilder driverColumnBuilder = builder.getColumnBuilder(2);
+    ColumnBuilder avgDailyHoursColumnBuilder = builder.getColumnBuilder(3);
+
+    Binary previousDevice = new Binary("", StandardCharsets.UTF_8);
+    Binary fleet = null;
+    Binary name = null;
+    Binary driver = null;
+
+    TimeDuration interval = new TimeDuration(0, convertToCurrPrecision(1, 
TimeUnit.DAYS));
+    AggrWindowIterator timeRangeIterator =
+        new AggrWindowIterator(startTime, endTime, interval, interval, true, 
true);
+    TimeRange currentTimeRange = timeRangeIterator.nextTimeRange();
+    int count = 0;
+    while (true) {
+      Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+      if (!optionalTsBlock.isPresent()) {
+        finished = true;
+        break;
+      }
+      TsBlock tsBlock = optionalTsBlock.get();
+
+      if (!tsBlock.isEmpty()) {
+        TimeColumn timeColumn = tsBlock.getTimeColumn();
+        BinaryColumn deviceColumn = (BinaryColumn) tsBlock.getColumn(0);
+        int currentCount = tsBlock.getPositionCount();
+        for (int i = 0; i < currentCount; i++) {
+          Binary deviceId = deviceColumn.getBinary(i);
+          long currentTime = timeColumn.getLong(i);
+          if (previousDevice.equals(deviceId)) {
+            // calculate current dat
+            if (currentTimeRange.contains(
+                currentTime)) { // current time range for current device is 
not finished
+              count++;
+            } else {
+              // previous day's result is done
+              builder.declarePosition();
+              timeColumnBuilder.writeLong(currentTimeRange.getMin());
+              fleetColumnBuilder.writeBinary(fleet);
+              nameColumnBuilder.writeBinary(name);
+              driverColumnBuilder.writeBinary(driver);
+              avgDailyHoursColumnBuilder.writeDouble(count / 6.0d);
+
+              // move time range to next one that contains the current time
+              currentTimeRange = timeRangeIterator.nextTimeRange();
+              while (!currentTimeRange.contains(currentTime)) {
+                builder.declarePosition();
+                timeColumnBuilder.writeLong(currentTimeRange.getMin());
+                fleetColumnBuilder.writeBinary(fleet);
+                nameColumnBuilder.writeBinary(name);
+                driverColumnBuilder.writeBinary(driver);
+                avgDailyHoursColumnBuilder.writeDouble(0.0d);
+                currentTimeRange = timeRangeIterator.nextTimeRange();
+              }
+              count = 1;
+            }
+          } else { // device changed, we need to decide whether to output 
previous device
+
+            if (fleet != null) {
+              // construct remaining data for previous device
+              do {
+                builder.declarePosition();
+                timeColumnBuilder.writeLong(currentTimeRange.getMin());
+                fleetColumnBuilder.writeBinary(fleet);
+                nameColumnBuilder.writeBinary(name);
+                driverColumnBuilder.writeBinary(driver);
+                avgDailyHoursColumnBuilder.writeDouble(count / 6.0d);
+                currentTimeRange = timeRangeIterator.nextTimeRange();
+                count = 0;
+              } while (timeRangeIterator.hasNextTimeRange());
+            }
+
+            // reset
+            previousDevice = deviceId;
+            String[] splitArray = 
deviceId.getStringValue(StandardCharsets.UTF_8).split("\\.");
+            fleet = new Binary(splitArray[FLEET_LEVEL], 
StandardCharsets.UTF_8);
+            name = new Binary(splitArray[NAME_LEVEL], StandardCharsets.UTF_8);
+            driver = new Binary(splitArray[DRIVER_LEVEL], 
StandardCharsets.UTF_8);
+            timeRangeIterator.reset();
+
+            // move time range to next one that contains the current time
+            currentTimeRange = timeRangeIterator.nextTimeRange();
+            while (!currentTimeRange.contains(currentTime)) {
+              builder.declarePosition();
+              timeColumnBuilder.writeLong(currentTimeRange.getMin());
+              fleetColumnBuilder.writeBinary(fleet);
+              nameColumnBuilder.writeBinary(name);
+              driverColumnBuilder.writeBinary(driver);
+              avgDailyHoursColumnBuilder.writeDouble(0.0d);
+              currentTimeRange = timeRangeIterator.nextTimeRange();
+            }
+            count = 1;
+          }
+        }
+      }
+    }
+
+    // output last device
+    if (fleet != null) {
+      // construct remaining data for last device
+      do {
+        builder.declarePosition();
+        timeColumnBuilder.writeLong(currentTimeRange.getMin());
+        fleetColumnBuilder.writeBinary(fleet);
+        nameColumnBuilder.writeBinary(name);
+        driverColumnBuilder.writeBinary(driver);
+        avgDailyHoursColumnBuilder.writeDouble(count / 6.0d);
+        currentTimeRange = timeRangeIterator.nextTimeRange();
+        count = 0;
+      } while (timeRangeIterator.hasNextTimeRange());
+    }
+
+    return new 
Pair<>(Collections.singletonList(serde.serialize(builder.build())), finished);
+  }
+
+  public static Pair<List<ByteBuffer>, Boolean> 
constructAvgDailyDrivingSessionResult(
+      IQueryExecution queryExecution, TsBlockSerde serde, long startTime, long 
endTime)
+      throws IoTDBException, IOException {
+    boolean finished;
+
+    TsBlockBuilder builder = new 
TsBlockBuilder(AVG_DAILY_DRIVING_SESSION_DATA_TYPES);
+    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+    ColumnBuilder nameColumnBuilder = builder.getColumnBuilder(0);
+    ColumnBuilder durationColumnBuilder = builder.getColumnBuilder(1);
+
+    int numOfTenMinutesInOneDay = 144;
+    List<Double> velocityList = new ArrayList<>(numOfTenMinutesInOneDay);
+    List<Long> timeList = new ArrayList<>(numOfTenMinutesInOneDay);
+    long dayDuration = convertToCurrPrecision(1, TimeUnit.DAYS);
+    int day = 0;
+    while (true) {
+      Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+      if (!optionalTsBlock.isPresent()) {
+        finished = true;
+        break;
+      }
+      TsBlock tsBlock = optionalTsBlock.get();
+
+      if (!tsBlock.isEmpty()) {
+        TimeColumn timeColumn = tsBlock.getTimeColumn();
+        BinaryColumn deviceColumn = (BinaryColumn) tsBlock.getColumn(0);
+        DoubleColumn velocityColumn = (DoubleColumn) tsBlock.getColumn(1);
+        int currentCount = tsBlock.getPositionCount();
+        for (int i = 0; i < currentCount; i++) {
+          long currentTime = timeColumn.getLong(i);
+          double velocity = velocityColumn.getDouble(i);
+          timeList.add(currentTime);
+          velocityList.add(velocityColumn.isNull(i) ? null : velocity);
+
+          if (timeList.size() == numOfTenMinutesInOneDay) {
+            builder.declarePosition();
+            timeColumnBuilder.writeLong(startTime + day * dayDuration);
+            nameColumnBuilder.writeBinary(
+                new Binary(
+                    deviceColumn
+                        .getBinary(i)
+                        .getStringValue(StandardCharsets.UTF_8)
+                        .split("\\.")[NAME_LEVEL],
+                    StandardCharsets.UTF_8));
+
+            // calculate avg driving duration for one day
+            long durationSum = 0;
+            long durationCount = 0;
+            Long start = null;
+            for (int index = 0; index < numOfTenMinutesInOneDay; index++) {
+              Double v = velocityList.get(index);
+              if (v != null && v > 5) {
+                if (start == null) {
+                  start = timeList.get(index);
+                }
+              } else {
+                if (start != null) {
+                  // one driving duration
+                  durationSum += (timeList.get(index) - start);
+                  durationCount++;
+                  start = null;
+                }
+              }
+            }
+
+            if (start != null) {
+              // one driving duration
+              durationSum += (startTime + (day + 1) * dayDuration - start);
+              durationCount++;
+            }
+            durationColumnBuilder.writeLong(durationSum / durationCount);
+
+            timeList.clear();
+            velocityList.clear();
+
+            day++;
+            // 10 days is done, move to next device
+            if (day == 10) {
+              day = 0;
+            }
+          }
+        }
+      }
+    }
+
+    return new 
Pair<>(Collections.singletonList(serde.serialize(builder.build())), finished);
+  }
+
+  public static Pair<List<ByteBuffer>, Boolean> constructAvgLoadResult(
+      IQueryExecution queryExecution,
+      Map<String, ClientRPCServiceImpl.DeviceAttributes> deviceAttributesMap,
+      TsBlockSerde serde)
+      throws IoTDBException, IOException {
+    boolean finished;
+
+    Map<AvgLoadKey, AvgLoadIntermediateResult> map = new HashMap<>();
+
+    while (true) {
+      Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+      if (!optionalTsBlock.isPresent()) {
+        finished = true;
+        break;
+      }
+      TsBlock tsBlock = optionalTsBlock.get();
+
+      if (!tsBlock.isEmpty()) {
+        BinaryColumn deviceColumn = (BinaryColumn) tsBlock.getColumn(0);
+        DoubleColumn avgLoadColumn = (DoubleColumn) tsBlock.getColumn(1);
+        int currentCount = tsBlock.getPositionCount();
+        for (int i = 0; i < currentCount; i++) {
+          if (avgLoadColumn.isNull(i)) {
+            continue;
+          }
+          String deviceId = 
deviceColumn.getBinary(i).getStringValue(StandardCharsets.UTF_8);
+          String[] deviceArray = deviceId.split("\\.");
+          String fleet = deviceArray[FLEET_LEVEL];
+          String model = deviceArray[MODEL_LEVEL];
+          double avgLoad = avgLoadColumn.getDouble(i);
+          double loadCapacity = deviceAttributesMap.get(deviceId).loadCapacity;
+          AvgLoadKey key = new AvgLoadKey(fleet, model, loadCapacity);
+          AvgLoadIntermediateResult intermediateResult =
+              map.computeIfAbsent(key, k -> new AvgLoadIntermediateResult());
+          intermediateResult.sum += (avgLoad / loadCapacity);
+          intermediateResult.count++;
+        }
+      }
+    }
+
+    int size = map.size();
+
+    TsBlockBuilder builder = new TsBlockBuilder(size, AVG_LOAD_DATA_TYPES);
+    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+    ColumnBuilder fleetColumnBuilder = builder.getColumnBuilder(0);
+    ColumnBuilder modelColumnBuilder = builder.getColumnBuilder(1);
+    ColumnBuilder loadCapacityColumnBuilder = builder.getColumnBuilder(2);
+    ColumnBuilder avgLoadColumnBuilder = builder.getColumnBuilder(3);
+
+    map.forEach(
+        (k, v) -> {
+          timeColumnBuilder.writeLong(0L);
+          fleetColumnBuilder.writeBinary(new Binary(k.fleet, 
StandardCharsets.UTF_8));
+          modelColumnBuilder.writeBinary(new Binary(k.model, 
StandardCharsets.UTF_8));
+          loadCapacityColumnBuilder.writeDouble(k.loadCapacity);
+          avgLoadColumnBuilder.writeDouble(v.sum / v.count);
+        });
+
+    builder.declarePositions(size);
+
+    return new 
Pair<>(Collections.singletonList(serde.serialize(builder.build())), finished);
+  }
+
+  private static class AvgLoadIntermediateResult {
+    long count = 0;
+
+    double sum = 0.0d;
+  }
+
+  private static class AvgLoadKey {
+    private final String fleet;
+
+    private final String model;
+
+    private final double loadCapacity;
+
+    public AvgLoadKey(String fleet, String model, double loadCapacity) {
+      this.fleet = fleet;
+      this.model = model;
+      this.loadCapacity = loadCapacity;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      AvgLoadKey that = (AvgLoadKey) o;
+      return Double.compare(loadCapacity, that.loadCapacity) == 0
+          && Objects.equals(fleet, that.fleet)
+          && Objects.equals(model, that.model);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(fleet, model, loadCapacity);
+    }
+  }
+
+  public static Pair<List<ByteBuffer>, Boolean> constructDailyActivityResult(
+      IQueryExecution queryExecution, TsBlockSerde serde) throws 
IoTDBException, IOException {
+    boolean finished;
+
+    Map<DailyActivityKey, DailyActivityValue> map = new HashMap<>();
+
+    long dayDuration = convertToCurrPrecision(1, TimeUnit.DAYS);
+
+    while (true) {
+      Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+      if (!optionalTsBlock.isPresent()) {
+        finished = true;
+        break;
+      }
+      TsBlock tsBlock = optionalTsBlock.get();
+
+      if (!tsBlock.isEmpty()) {
+        TimeColumn timeColumn = tsBlock.getTimeColumn();
+        BinaryColumn deviceColumn = (BinaryColumn) tsBlock.getColumn(0);
+        int currentCount = tsBlock.getPositionCount();
+        for (int i = 0; i < currentCount; i++) {
+          String deviceId = 
deviceColumn.getBinary(i).getStringValue(StandardCharsets.UTF_8);
+          String[] deviceArray = deviceId.split("\\.");
+          String fleet = deviceArray[FLEET_LEVEL];
+          String model = deviceArray[MODEL_LEVEL];
+          long day = timeColumn.getLong(i) / dayDuration * dayDuration;
+          DailyActivityKey key = new DailyActivityKey(fleet, model, day);
+          DailyActivityValue value = map.computeIfAbsent(key, k -> new 
DailyActivityValue());
+          value.count++;
+        }
+      }
+    }
+
+    int size = map.size();
+
+    TsBlockBuilder builder = new TsBlockBuilder(size, 
DAILY_ACTIVITY_DATA_TYPES);
+    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+    ColumnBuilder fleetColumnBuilder = builder.getColumnBuilder(0);
+    ColumnBuilder modelColumnBuilder = builder.getColumnBuilder(1);
+    ColumnBuilder dailyActivityColumnBuilder = builder.getColumnBuilder(2);
+
+    map.forEach(
+        (k, v) -> {
+          timeColumnBuilder.writeLong(k.day);
+          fleetColumnBuilder.writeBinary(new Binary(k.fleet, 
StandardCharsets.UTF_8));
+          modelColumnBuilder.writeBinary(new Binary(k.model, 
StandardCharsets.UTF_8));
+          dailyActivityColumnBuilder.writeLong(v.count / 144);
+        });
+
+    builder.declarePositions(size);
+
+    return new 
Pair<>(Collections.singletonList(serde.serialize(builder.build())), finished);
+  }
+
+  private static class DailyActivityKey {
+    private final String fleet;
+
+    private final String model;
+
+    private final long day;
+
+    public DailyActivityKey(String fleet, String model, long day) {
+      this.fleet = fleet;
+      this.model = model;
+      this.day = day;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      DailyActivityKey that = (DailyActivityKey) o;
+      return day == that.day
+          && Objects.equals(fleet, that.fleet)
+          && Objects.equals(model, that.model);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(fleet, model, day);
+    }
+  }
+
+  private static class DailyActivityValue {
+    long count = 0;
+  }
 }
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index a99a8e599c9..c59bf4addf8 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -528,6 +528,85 @@ struct TSConnectionInfoResp {
   1: required list<TSConnectionInfo> connectionInfoList
 }
 
+struct TSHighLoadReq {
+  1: required i64 sessionId
+  2: required i64 statementId
+  3: required string fleet
+  4: required i32 fetchSize
+  5: required i64 timeout
+}
+
+struct TSLongDrivingSessionsReq {
+  1: required i64 sessionId
+  2: required i64 statementId
+  3: required string fleet
+  4: required i64 startTime
+  5: required i64 endTime
+  6: required i32 fetchSize
+  7: required i64 timeout
+}
+
+struct TSLongDailySessionsReq {
+  1: required i64 sessionId
+  2: required i64 statementId
+  3: required string fleet
+  4: required i64 startTime
+  5: required i64 endTime
+  6: required i32 fetchSize
+  7: required i64 timeout
+}
+
+struct TSAvgVsProjectedFuelConsumptionReq {
+  1: required i64 sessionId
+  2: required i64 statementId
+  3: required i32 fetchSize
+  4: required i64 timeout
+}
+
+struct TSAvgDailyDrivingDurationReq {
+  1: required i64 sessionId
+  2: required i64 statementId
+  3: required i64 startTime
+  4: required i64 endTime
+  5: required i32 fetchSize
+  6: required i64 timeout
+}
+
+struct TSAvgDailyDrivingSessionReq {
+  1: required i64 sessionId
+  2: required i64 statementId
+  3: required i64 startTime
+  4: required i64 endTime
+  5: required i32 fetchSize
+  6: required i64 timeout
+}
+
+struct TSAvgLoadReq {
+  1: required i64 sessionId
+  2: required i64 statementId
+  3: required i32 fetchSize
+  4: required i64 timeout
+}
+
+struct TSDailyActivityReq {
+  1: required i64 sessionId
+  2: required i64 statementId
+  3: required i64 startTime
+  4: required i64 endTime
+  5: required i32 fetchSize
+  6: required i64 timeout
+}
+
+struct TSBreakdownFrequencyReq {
+  1: required i64 sessionId
+  2: required i64 statementId
+  3: required i64 startTime
+  4: required i64 endTime
+  5: required i32 fetchSize
+  6: required i64 timeout
+}
+
+
 service IClientRPCService {
 
   TSExecuteStatementResp executeQueryStatementV2(1:TSExecuteStatementReq req);
@@ -624,6 +703,24 @@ service IClientRPCService {
 
   TSExecuteStatementResp executeAggregationQuery(1:TSAggregationQueryReq req);
 
+  TSExecuteStatementResp highLoad(1:TSHighLoadReq req);
+
+  TSExecuteStatementResp longDrivingSessions(1:TSLongDrivingSessionsReq req);
+
+  TSExecuteStatementResp longDailySessions(1:TSLongDailySessionsReq req);
+
+  TSExecuteStatementResp 
avgVsProjectedFuelConsumption(1:TSAvgVsProjectedFuelConsumptionReq req);
+
+  TSExecuteStatementResp 
avgDailyDrivingDuration(1:TSAvgDailyDrivingDurationReq req);
+
+  TSExecuteStatementResp avgDailyDrivingSession(1:TSAvgDailyDrivingSessionReq 
req);
+
+  TSExecuteStatementResp avgLoad(1:TSAvgLoadReq req);
+
+  TSExecuteStatementResp dailyActivity(1:TSDailyActivityReq req);
+
+  TSExecuteStatementResp breakdownFrequency(1:TSBreakdownFrequencyReq req);
+
   i64 requestStatementId(1:i64 sessionId);
 
   common.TSStatus createSchemaTemplate(1:TSCreateSchemaTemplateReq req);

Reply via email to