This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch tsbs/iot
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tsbs/iot by this push:
new 77c8ae3a674 support Q1-Q11
77c8ae3a674 is described below
commit 77c8ae3a674cd6e2af018b67407839aa2da89561
Author: JackieTien97 <[email protected]>
AuthorDate: Sun Apr 28 19:09:32 2024 +0800
support Q1-Q11
---
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 1201 ++++++++++++++++++++
.../queryengine/plan/statement/StatementType.java | 10 +
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 625 ++++++++++
.../thrift-datanode/src/main/thrift/client.thrift | 97 ++
4 files changed, 1933 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index fb8fc40ba46..320e8fc7bfb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -121,7 +121,12 @@ import
org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSAvgDailyDrivingDurationReq;
+import org.apache.iotdb.service.rpc.thrift.TSAvgDailyDrivingSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSAvgLoadReq;
+import org.apache.iotdb.service.rpc.thrift.TSAvgVsProjectedFuelConsumptionReq;
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
+import org.apache.iotdb.service.rpc.thrift.TSBreakdownFrequencyReq;
import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
@@ -130,6 +135,7 @@ import
org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSDailyActivityReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
@@ -142,6 +148,7 @@ import
org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
import org.apache.iotdb.service.rpc.thrift.TSGroupByQueryIntervalReq;
+import org.apache.iotdb.service.rpc.thrift.TSHighLoadReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -151,6 +158,8 @@ import
org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
+import org.apache.iotdb.service.rpc.thrift.TSLongDailySessionsReq;
+import org.apache.iotdb.service.rpc.thrift.TSLongDrivingSessionsReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
@@ -165,6 +174,8 @@ import
org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Bytes;
import io.airlift.units.Duration;
import io.jsonwebtoken.lang.Strings;
import org.apache.commons.lang3.StringUtils;
@@ -191,6 +202,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -202,6 +214,13 @@ import static
org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
import static
org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.db.queryengine.plan.statement.StatementType.AVG_DAILY_DRIVING_DURATION;
+import static
org.apache.iotdb.db.queryengine.plan.statement.StatementType.AVG_DAILY_DRIVING_SESSION;
+import static
org.apache.iotdb.db.queryengine.plan.statement.StatementType.AVG_LOAD;
+import static
org.apache.iotdb.db.queryengine.plan.statement.StatementType.AVG_VS_PROJECTED_FUEL_CONSUMPTION;
+import static
org.apache.iotdb.db.queryengine.plan.statement.StatementType.DAILY_ACTIVITY;
+import static
org.apache.iotdb.db.queryengine.plan.statement.StatementType.HIGH_LOAD;
+import static
org.apache.iotdb.db.queryengine.plan.statement.StatementType.LONG_DRIVING_SESSIONS;
import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static
org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException;
@@ -238,6 +257,258 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+ private static final String NAME_COLUMN_NAME = "name";
+
+ private static final String DRIVER_COLUMN_NAME = "DRIVER";
+
+ private static final String CURRENT_LOAD_COLUMN_NAME = "current_load";
+
+ private static final String LOAD_CAPACITY_COLUMN_NAME = "load_capacity";
+
+ private static final String VELOCITY_COLUMN_NAME = "velocity";
+
+ private static final String FUEL_CONSUMPTION_COLUMN_NAME =
"fuel_consumption";
+
+ private static final String NOMINAL_FUEL_CONSUMPTION_COLUMN_NAME =
"nominal_fuel_consumption";
+
+ private static final String FLEET_COLUMN_NAME = "fleet";
+
+ private static final String MODEL_COLUMN_NAME = "model";
+
+ private static final String STATUS_COLUMN_NAME = "status";
+
+ private static final String HIGH_LOAD_SQL_TEMPLATE =
+ "select last" + CURRENT_LOAD_COLUMN_NAME + " from
root.diagnostics.%s.**";
+
+ private static final List<String> HIGH_LOAD_HEADERS =
+ ImmutableList.of(
+ NAME_COLUMN_NAME,
+ DRIVER_COLUMN_NAME,
+ CURRENT_LOAD_COLUMN_NAME,
+ LOAD_CAPACITY_COLUMN_NAME);
+
+ private static final List<String> HIGH_LOAD_HEADER_DATA_TYPES =
+ ImmutableList.of(
+ TSDataType.TEXT.toString(),
+ TSDataType.TEXT.toString(),
+ TSDataType.DOUBLE.toString(),
+ TSDataType.DOUBLE.toString());
+
+ public static final List<TSDataType> HIGH_LOAD_DATA_TYPES =
+ ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.DOUBLE,
TSDataType.DOUBLE);
+
+ private static final Map<String, Integer> HIGH_LOAD_COLUMN_NAME_INDEX_MAP =
new HashMap<>();
+
+ static {
+ for (int i = 0; i < HIGH_LOAD_HEADERS.size(); i++) {
+ HIGH_LOAD_COLUMN_NAME_INDEX_MAP.put(HIGH_LOAD_HEADERS.get(i), i);
+ }
+ }
+
+ private static final List<Byte> HIGH_LOAD_ALIAS_COLUMNS =
+ new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+
+ private static final String LONG_DRIVING_SESSIONS_SQL_TEMPLATE =
+ "select avg("
+ + VELOCITY_COLUMN_NAME
+ + ") from root.readings.%s.** GROUP BY([%d, %d), 10m) ALIGN BY
DEVICE HAVING avg("
+ + VELOCITY_COLUMN_NAME
+ + ") > 1";
+
+ private static final List<String> LONG_DRIVING_SESSIONS_HEADERS =
+ ImmutableList.of(NAME_COLUMN_NAME, DRIVER_COLUMN_NAME);
+
+ private static final List<String> LONG_DRIVING_SESSIONS_HEADER_DATA_TYPES =
+ ImmutableList.of(TSDataType.TEXT.toString(), TSDataType.TEXT.toString());
+
+ public static final List<TSDataType> LONG_DRIVING_SESSIONS_DATA_TYPES =
+ ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT);
+
+ private static final Map<String, Integer>
LONG_DRIVING_SESSIONS_COLUMN_NAME_INDEX_MAP =
+ new HashMap<>();
+
+ static {
+ for (int i = 0; i < LONG_DRIVING_SESSIONS_HEADERS.size(); i++) {
+
LONG_DRIVING_SESSIONS_COLUMN_NAME_INDEX_MAP.put(LONG_DRIVING_SESSIONS_HEADERS.get(i),
i);
+ }
+ }
+
+ private static final List<Byte> LONG_DRIVING_SESSIONS_ALIAS_COLUMNS =
+ new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+
+ private static final String AVG_VS_PROJ_FUEL_CONSUMPTION_SQL_TEMPLATE =
+ "select sum("
+ + FUEL_CONSUMPTION_COLUMN_NAME
+ + "), count("
+ + FUEL_CONSUMPTION_COLUMN_NAME
+ + ") from root.readings.** where "
+ + VELOCITY_COLUMN_NAME
+ + " > 1 ALIGN BY DEVICE";
+
+ private static final List<String> AVG_VS_PROJ_FUEL_CONSUMPTION_HEADERS =
+ ImmutableList.of(
+ FLEET_COLUMN_NAME,
+ "avg_" + FUEL_CONSUMPTION_COLUMN_NAME,
+ NOMINAL_FUEL_CONSUMPTION_COLUMN_NAME);
+
+ private static final List<String>
AVG_VS_PROJ_FUEL_CONSUMPTION_HEADER_DATA_TYPES =
+ ImmutableList.of(
+ TSDataType.TEXT.toString(), TSDataType.DOUBLE.toString(),
TSDataType.DOUBLE.toString());
+
+ public static final List<TSDataType> AVG_VS_PROJ_FUEL_CONSUMPTION_DATA_TYPES
=
+ ImmutableList.of(TSDataType.TEXT, TSDataType.DOUBLE, TSDataType.DOUBLE);
+
+ private static final Map<String, Integer>
AVG_VS_PROJ_FUEL_CONSUMPTION_COLUMN_NAME_INDEX_MAP =
+ new HashMap<>();
+
+ static {
+ for (int i = 0; i < AVG_VS_PROJ_FUEL_CONSUMPTION_HEADERS.size(); i++) {
+ AVG_VS_PROJ_FUEL_CONSUMPTION_COLUMN_NAME_INDEX_MAP.put(
+ AVG_VS_PROJ_FUEL_CONSUMPTION_HEADERS.get(i), i);
+ }
+ }
+
+ private static final List<Byte> AVG_VS_PROJ_FUEL_CONSUMPTION_ALIAS_COLUMNS =
+ new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+
+ private static final String AVG_DAILY_DRIVING_DURATION_SQL_TEMPLATE =
+ "select avg("
+ + VELOCITY_COLUMN_NAME
+ + ") from root.readings.** GROUP BY([%d, %d), 10m) HAVING avg("
+ + VELOCITY_COLUMN_NAME
+ + ") > 1 ALIGN BY DEVICE";
+
+ private static final List<String> AVG_DAILY_DRIVING_DURATION_HEADERS =
+ ImmutableList.of(FLEET_COLUMN_NAME, NAME_COLUMN_NAME,
DRIVER_COLUMN_NAME, "avg_daily_hours");
+
+ private static final List<String>
AVG_DAILY_DRIVING_DURATION_HEADER_DATA_TYPES =
+ ImmutableList.of(
+ TSDataType.TEXT.toString(),
+ TSDataType.TEXT.toString(),
+ TSDataType.TEXT.toString(),
+ TSDataType.DOUBLE.toString());
+
+ public static final List<TSDataType> AVG_DAILY_DRIVING_DURATION_DATA_TYPES =
+ ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT,
TSDataType.DOUBLE);
+
+ private static final Map<String, Integer>
AVG_DAILY_DRIVING_DURATIONN_COLUMN_NAME_INDEX_MAP =
+ new HashMap<>();
+
+ static {
+ for (int i = 0; i < AVG_DAILY_DRIVING_DURATION_HEADERS.size(); i++) {
+ AVG_DAILY_DRIVING_DURATIONN_COLUMN_NAME_INDEX_MAP.put(
+ AVG_DAILY_DRIVING_DURATION_HEADERS.get(i), i);
+ }
+ }
+
+ private static final List<Byte> AVG_DAILY_DRIVING_DURATIONN_ALIAS_COLUMNS =
+ new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+
+ private static final String AVG_DAILY_DRIVING_SESSION_SQL_TEMPLATE =
+ "select avg("
+ + VELOCITY_COLUMN_NAME
+ + ") from root.readings.** GROUP BY([%d, %d), 10m) ALIGN BY DEVICE";
+
+ private static final List<String> AVG_DAILY_DRIVING_SESSION_HEADERS =
+ ImmutableList.of(NAME_COLUMN_NAME, "avg_daily_driving_duration");
+
+ private static final List<String>
AVG_DAILY_DRIVING_SESSION_HEADER_DATA_TYPES =
+ ImmutableList.of(TSDataType.TEXT.toString(),
TSDataType.INT64.toString());
+
+ public static final List<TSDataType> AVG_DAILY_DRIVING_SESSION_DATA_TYPES =
+ ImmutableList.of(TSDataType.TEXT, TSDataType.INT64);
+
+ private static final Map<String, Integer>
AVG_DAILY_DRIVING_SESSION_COLUMN_NAME_INDEX_MAP =
+ new HashMap<>();
+
+ static {
+ for (int i = 0; i < AVG_DAILY_DRIVING_SESSION_HEADERS.size(); i++) {
+ AVG_DAILY_DRIVING_SESSION_COLUMN_NAME_INDEX_MAP.put(
+ AVG_DAILY_DRIVING_SESSION_HEADERS.get(i), i);
+ }
+ }
+
+ private static final List<Byte> AVG_DAILY_DRIVING_SESSION_ALIAS_COLUMNS =
+ new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+
+ private static final String AVG_LOAD_SQL_TEMPLATE =
+ "select avg(" + CURRENT_LOAD_COLUMN_NAME + ") from root.diagnostics.**
ALIGN BY DEVICE";
+
+ private static final List<String> AVG_LOAD_HEADERS =
+ ImmutableList.of(
+ FLEET_COLUMN_NAME, MODEL_COLUMN_NAME, LOAD_CAPACITY_COLUMN_NAME,
"avg_load_percentage");
+
+ private static final List<String> AVG_LOAD_HEADER_DATA_TYPES =
+ ImmutableList.of(
+ TSDataType.TEXT.toString(),
+ TSDataType.TEXT.toString(),
+ TSDataType.TEXT.toString(),
+ TSDataType.DOUBLE.toString());
+
+ public static final List<TSDataType> AVG_LOAD_DATA_TYPES =
+ ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT,
TSDataType.DOUBLE);
+
+ private static final Map<String, Integer> AVG_LOAD_COLUMN_NAME_INDEX_MAP =
new HashMap<>();
+
+ static {
+ for (int i = 0; i < AVG_LOAD_HEADERS.size(); i++) {
+ AVG_LOAD_COLUMN_NAME_INDEX_MAP.put(AVG_LOAD_HEADERS.get(i), i);
+ }
+ }
+
+ private static final List<Byte> AVG_LOAD_ALIAS_COLUMNS =
+ new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+
+ private static final String DAILY_ACTIVITY_SQL_TEMPLATE =
+ "select avg("
+ + STATUS_COLUMN_NAME
+ + ") from root.diagnostics.** GROUP BY([%d, %d), 10m) HAVING avg("
+ + STATUS_COLUMN_NAME
+ + ") < 1 ALIGN BY DEVICE";
+
+ private static final List<String> DAILY_ACTIVITY_HEADERS =
+ ImmutableList.of(FLEET_COLUMN_NAME, MODEL_COLUMN_NAME, "daily_activity");
+
+ private static final List<String> DAILY_ACTIVITY_HEADER_DATA_TYPES =
+ ImmutableList.of(
+ TSDataType.TEXT.toString(), TSDataType.TEXT.toString(),
TSDataType.INT64.toString());
+
+ public static final List<TSDataType> DAILY_ACTIVITY_DATA_TYPES =
+ ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.INT64);
+
+ private static final Map<String, Integer>
DAILY_ACTIVITY_COLUMN_NAME_INDEX_MAP = new HashMap<>();
+
+ static {
+ for (int i = 0; i < DAILY_ACTIVITY_HEADERS.size(); i++) {
+ DAILY_ACTIVITY_COLUMN_NAME_INDEX_MAP.put(DAILY_ACTIVITY_HEADERS.get(i),
i);
+ }
+ }
+
+ private static final List<Byte> DAILY_ACTIVITY_ALIAS_COLUMNS =
+ new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+
+ private static final Map<String, DeviceAttributes> DEVICE_ATTRIBUTES_MAP =
new HashMap<>();
+
+ public static class DeviceAttributes {
+ public final double nominalFuelConsumption;
+
+ public final double loadCapacity;
+
+ public final double fuelCapacity;
+
+ public DeviceAttributes(
+ double nominalFuelConsumption, double loadCapacity, double
fuelCapacity) {
+ this.nominalFuelConsumption = nominalFuelConsumption;
+ this.loadCapacity = loadCapacity;
+ this.fuelCapacity = fuelCapacity;
+ }
+ }
+
+ public static final int FLEET_LEVEL = 2;
+ public static final int MODEL_LEVEL = 3;
+ public static final int NAME_LEVEL = 4;
+ public static final int DRIVER_LEVEL = 5;
+
@FunctionalInterface
public interface SelectResult {
boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution,
int fetchSize)
@@ -2184,6 +2455,936 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
return executeAggregationQueryInternal(req, OLD_SELECT_RESULT);
}
+ @Override
+ public TSExecuteStatementResp highLoad(TSHighLoadReq req) throws TException {
+ boolean finished = false;
+ long queryId = Long.MIN_VALUE;
+ String statement = String.format(HIGH_LOAD_SQL_TEMPLATE, req.fleet);
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ // quota
+ OperationQuota quota = null;
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+
+ long startTime = System.nanoTime();
+ StatementType statementType = null;
+ Throwable t = null;
+ try {
+ Statement s = StatementGenerator.createStatement(statement,
clientSession.getZoneId());
+
+ if (s == null) {
+ return RpcUtils.getTSExecuteStatementResp(
+ RpcUtils.getStatus(
+ TSStatusCode.SQL_PARSE_ERROR, "This operation type is not
supported"));
+ }
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(status);
+ }
+
+ quota =
+ DataNodeThrottleQuotaManager.getInstance()
+ .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+ statementType = s.getType();
+ if (ENABLE_AUDIT_LOG) {
+ AuditLogger.log(statement, s);
+ }
+
+ queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.executeForTreeModel(
+ s,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(clientSession),
+ statement,
+ partitionFetcher,
+ schemaFetcher,
+ req.getTimeout());
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+ try (SetThreadName threadName = new
SetThreadName(result.queryId.getId())) {
+ TSExecuteStatementResp resp;
+ if (queryExecution != null && queryExecution.isQuery()) {
+ resp =
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+ resp.setColumnNameIndexMap(HIGH_LOAD_COLUMN_NAME_INDEX_MAP);
+ resp.setSgColumns(Collections.emptyList());
+ resp.setColumns(HIGH_LOAD_HEADERS);
+ resp.setDataTypeList(HIGH_LOAD_HEADER_DATA_TYPES);
+ resp.setAliasColumns(HIGH_LOAD_ALIAS_COLUMNS);
+ resp.setIgnoreTimeStamp(false);
+ resp.setQueryId(queryId);
+ resp.setStatus(result.status);
+ Pair<List<ByteBuffer>, Boolean> pair =
+ QueryDataSetUtils.constructHighLoadResult(
+ queryExecution, DEVICE_ATTRIBUTES_MAP, serde);
+ finished = pair.right;
+ resp.setQueryResult(pair.left);
+ resp.setMoreData(!finished);
+ quota.addReadResult(resp.getQueryResult());
+ } else {
+ finished = true;
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+ return resp;
+ }
+ } catch (Exception e) {
+ finished = true;
+ t = e;
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + statement + "\". " +
OperationType.EXECUTE_STATEMENT));
+ } catch (Error error) {
+ t = error;
+ throw error;
+ } finally {
+ long currentOperationCost = System.nanoTime() - startTime;
+ COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+ // record each operation time cost
+ if (statementType != null) {
+ addStatementExecutionLatency(
+ OperationType.EXECUTE_QUERY_STATEMENT, HIGH_LOAD.name(),
currentOperationCost);
+ }
+
+ if (finished) {
+ // record total time cost for one query
+ long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+ addQueryLatency(HIGH_LOAD, executionTime > 0 ? executionTime :
currentOperationCost);
+ COORDINATOR.cleanupQueryExecution(queryId, req, t);
+ }
+ SESSION_MANAGER.updateIdleTime();
+ if (quota != null) {
+ quota.close();
+ }
+ }
+ }
+
+ @Override
+ public TSExecuteStatementResp longDrivingSessions(TSLongDrivingSessionsReq
req)
+ throws TException {
+ boolean finished = false;
+ long queryId = Long.MIN_VALUE;
+ String statement =
+ String.format(LONG_DRIVING_SESSIONS_SQL_TEMPLATE, req.fleet,
req.startTime, req.endTime);
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ // quota
+ OperationQuota quota = null;
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+
+ long startTime = System.nanoTime();
+ StatementType statementType = null;
+ Throwable t = null;
+ try {
+ Statement s = StatementGenerator.createStatement(statement,
clientSession.getZoneId());
+
+ if (s == null) {
+ return RpcUtils.getTSExecuteStatementResp(
+ RpcUtils.getStatus(
+ TSStatusCode.SQL_PARSE_ERROR, "This operation type is not
supported"));
+ }
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(status);
+ }
+
+ quota =
+ DataNodeThrottleQuotaManager.getInstance()
+ .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+ statementType = s.getType();
+ if (ENABLE_AUDIT_LOG) {
+ AuditLogger.log(statement, s);
+ }
+
+ queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.executeForTreeModel(
+ s,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(clientSession),
+ statement,
+ partitionFetcher,
+ schemaFetcher,
+ req.getTimeout());
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+ try (SetThreadName threadName = new
SetThreadName(result.queryId.getId())) {
+ TSExecuteStatementResp resp;
+ if (queryExecution != null && queryExecution.isQuery()) {
+ resp =
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+
resp.setColumnNameIndexMap(LONG_DRIVING_SESSIONS_COLUMN_NAME_INDEX_MAP);
+ resp.setSgColumns(Collections.emptyList());
+ resp.setColumns(LONG_DRIVING_SESSIONS_HEADERS);
+ resp.setDataTypeList(LONG_DRIVING_SESSIONS_HEADER_DATA_TYPES);
+ resp.setAliasColumns(LONG_DRIVING_SESSIONS_ALIAS_COLUMNS);
+ resp.setIgnoreTimeStamp(true);
+ resp.setQueryId(queryId);
+ resp.setStatus(result.status);
+ Pair<List<ByteBuffer>, Boolean> pair =
+
QueryDataSetUtils.constructLongDrivingSessionsResult(queryExecution, serde, 22);
+ finished = pair.right;
+ resp.setQueryResult(pair.left);
+ resp.setMoreData(!finished);
+ quota.addReadResult(resp.getQueryResult());
+ } else {
+ finished = true;
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+ return resp;
+ }
+ } catch (Exception e) {
+ finished = true;
+ t = e;
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + statement + "\". " +
OperationType.EXECUTE_STATEMENT));
+ } catch (Error error) {
+ t = error;
+ throw error;
+ } finally {
+ long currentOperationCost = System.nanoTime() - startTime;
+ COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+ // record each operation time cost
+ if (statementType != null) {
+ addStatementExecutionLatency(
+ OperationType.EXECUTE_QUERY_STATEMENT,
+ LONG_DRIVING_SESSIONS.name(),
+ currentOperationCost);
+ }
+
+ if (finished) {
+ // record total time cost for one query
+ long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+ addQueryLatency(
+ LONG_DRIVING_SESSIONS, executionTime > 0 ? executionTime :
currentOperationCost);
+ COORDINATOR.cleanupQueryExecution(queryId, req, t);
+ }
+ SESSION_MANAGER.updateIdleTime();
+ if (quota != null) {
+ quota.close();
+ }
+ }
+ }
+
+ @Override
+ public TSExecuteStatementResp longDailySessions(TSLongDailySessionsReq req)
throws TException {
+ boolean finished = false;
+ long queryId = Long.MIN_VALUE;
+ String statement =
+ String.format(LONG_DRIVING_SESSIONS_SQL_TEMPLATE, req.fleet,
req.startTime, req.endTime);
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ // quota
+ OperationQuota quota = null;
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+
+ long startTime = System.nanoTime();
+ StatementType statementType = null;
+ Throwable t = null;
+ try {
+ Statement s = StatementGenerator.createStatement(statement,
clientSession.getZoneId());
+
+ if (s == null) {
+ return RpcUtils.getTSExecuteStatementResp(
+ RpcUtils.getStatus(
+ TSStatusCode.SQL_PARSE_ERROR, "This operation type is not
supported"));
+ }
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(status);
+ }
+
+ quota =
+ DataNodeThrottleQuotaManager.getInstance()
+ .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+ statementType = s.getType();
+ if (ENABLE_AUDIT_LOG) {
+ AuditLogger.log(statement, s);
+ }
+
+ queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.executeForTreeModel(
+ s,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(clientSession),
+ statement,
+ partitionFetcher,
+ schemaFetcher,
+ req.getTimeout());
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+ try (SetThreadName threadName = new
SetThreadName(result.queryId.getId())) {
+ TSExecuteStatementResp resp;
+ if (queryExecution != null && queryExecution.isQuery()) {
+ resp =
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+
resp.setColumnNameIndexMap(LONG_DRIVING_SESSIONS_COLUMN_NAME_INDEX_MAP);
+ resp.setSgColumns(Collections.emptyList());
+ resp.setColumns(LONG_DRIVING_SESSIONS_HEADERS);
+ resp.setDataTypeList(LONG_DRIVING_SESSIONS_HEADER_DATA_TYPES);
+ resp.setAliasColumns(LONG_DRIVING_SESSIONS_ALIAS_COLUMNS);
+ resp.setIgnoreTimeStamp(true);
+ resp.setQueryId(queryId);
+ resp.setStatus(result.status);
+ Pair<List<ByteBuffer>, Boolean> pair =
+
QueryDataSetUtils.constructLongDrivingSessionsResult(queryExecution, serde, 60);
+ finished = pair.right;
+ resp.setQueryResult(pair.left);
+ resp.setMoreData(!finished);
+ quota.addReadResult(resp.getQueryResult());
+ } else {
+ finished = true;
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+ return resp;
+ }
+ } catch (Exception e) {
+ finished = true;
+ t = e;
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + statement + "\". " +
OperationType.EXECUTE_STATEMENT));
+ } catch (Error error) {
+ t = error;
+ throw error;
+ } finally {
+ long currentOperationCost = System.nanoTime() - startTime;
+ COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+ // record each operation time cost
+ if (statementType != null) {
+ addStatementExecutionLatency(
+ OperationType.EXECUTE_QUERY_STATEMENT,
+ LONG_DRIVING_SESSIONS.name(),
+ currentOperationCost);
+ }
+
+ if (finished) {
+ // record total time cost for one query
+ long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+ addQueryLatency(
+ LONG_DRIVING_SESSIONS, executionTime > 0 ? executionTime :
currentOperationCost);
+ COORDINATOR.cleanupQueryExecution(queryId, req, t);
+ }
+ SESSION_MANAGER.updateIdleTime();
+ if (quota != null) {
+ quota.close();
+ }
+ }
+ }
+
+ @Override
+ public TSExecuteStatementResp avgVsProjectedFuelConsumption(
+ TSAvgVsProjectedFuelConsumptionReq req) throws TException {
+ boolean finished = false;
+ long queryId = Long.MIN_VALUE;
+ String statement = AVG_VS_PROJ_FUEL_CONSUMPTION_SQL_TEMPLATE;
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ // quota
+ OperationQuota quota = null;
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+
+ long startTime = System.nanoTime();
+ StatementType statementType = null;
+ Throwable t = null;
+ try {
+ Statement s = StatementGenerator.createStatement(statement,
clientSession.getZoneId());
+
+ if (s == null) {
+ return RpcUtils.getTSExecuteStatementResp(
+ RpcUtils.getStatus(
+ TSStatusCode.SQL_PARSE_ERROR, "This operation type is not
supported"));
+ }
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(status);
+ }
+
+ quota =
+ DataNodeThrottleQuotaManager.getInstance()
+ .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+ statementType = s.getType();
+ if (ENABLE_AUDIT_LOG) {
+ AuditLogger.log(statement, s);
+ }
+
+ queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.executeForTreeModel(
+ s,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(clientSession),
+ statement,
+ partitionFetcher,
+ schemaFetcher,
+ req.getTimeout());
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+ try (SetThreadName threadName = new
SetThreadName(result.queryId.getId())) {
+ TSExecuteStatementResp resp;
+ if (queryExecution != null && queryExecution.isQuery()) {
+ resp =
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+
resp.setColumnNameIndexMap(AVG_VS_PROJ_FUEL_CONSUMPTION_COLUMN_NAME_INDEX_MAP);
+ resp.setSgColumns(Collections.emptyList());
+ resp.setColumns(AVG_VS_PROJ_FUEL_CONSUMPTION_HEADERS);
+ resp.setDataTypeList(AVG_VS_PROJ_FUEL_CONSUMPTION_HEADER_DATA_TYPES);
+ resp.setAliasColumns(AVG_VS_PROJ_FUEL_CONSUMPTION_ALIAS_COLUMNS);
+ resp.setIgnoreTimeStamp(true);
+ resp.setQueryId(queryId);
+ resp.setStatus(result.status);
+ Pair<List<ByteBuffer>, Boolean> pair =
+ QueryDataSetUtils.constructAvgVsProjectedFuelConsumptionResult(
+ queryExecution, DEVICE_ATTRIBUTES_MAP, serde);
+ finished = pair.right;
+ resp.setQueryResult(pair.left);
+ resp.setMoreData(!finished);
+ quota.addReadResult(resp.getQueryResult());
+ } else {
+ finished = true;
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+ return resp;
+ }
+ } catch (Exception e) {
+ finished = true;
+ t = e;
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + statement + "\". " +
OperationType.EXECUTE_STATEMENT));
+ } catch (Error error) {
+ t = error;
+ throw error;
+ } finally {
+ long currentOperationCost = System.nanoTime() - startTime;
+ COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+ // record each operation time cost
+ if (statementType != null) {
+ addStatementExecutionLatency(
+ OperationType.EXECUTE_QUERY_STATEMENT,
+ AVG_VS_PROJECTED_FUEL_CONSUMPTION.name(),
+ currentOperationCost);
+ }
+
+ if (finished) {
+ // record total time cost for one query
+ long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+ addQueryLatency(
+ AVG_VS_PROJECTED_FUEL_CONSUMPTION,
+ executionTime > 0 ? executionTime : currentOperationCost);
+ COORDINATOR.cleanupQueryExecution(queryId, req, t);
+ }
+ SESSION_MANAGER.updateIdleTime();
+ if (quota != null) {
+ quota.close();
+ }
+ }
+ }
+
+ @Override
+ public TSExecuteStatementResp
avgDailyDrivingDuration(TSAvgDailyDrivingDurationReq req)
+ throws TException {
+ boolean finished = false;
+ long queryId = Long.MIN_VALUE;
+ String statement =
+ String.format(AVG_DAILY_DRIVING_DURATION_SQL_TEMPLATE, req.startTime,
req.endTime);
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ // quota
+ OperationQuota quota = null;
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+
+ long startTime = System.nanoTime();
+ StatementType statementType = null;
+ Throwable t = null;
+ try {
+ Statement s = StatementGenerator.createStatement(statement,
clientSession.getZoneId());
+
+ if (s == null) {
+ return RpcUtils.getTSExecuteStatementResp(
+ RpcUtils.getStatus(
+ TSStatusCode.SQL_PARSE_ERROR, "This operation type is not
supported"));
+ }
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(status);
+ }
+
+ quota =
+ DataNodeThrottleQuotaManager.getInstance()
+ .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+ statementType = s.getType();
+ if (ENABLE_AUDIT_LOG) {
+ AuditLogger.log(statement, s);
+ }
+
+ queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.executeForTreeModel(
+ s,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(clientSession),
+ statement,
+ partitionFetcher,
+ schemaFetcher,
+ req.getTimeout());
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+ try (SetThreadName threadName = new
SetThreadName(result.queryId.getId())) {
+ TSExecuteStatementResp resp;
+ if (queryExecution != null && queryExecution.isQuery()) {
+ resp =
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+
resp.setColumnNameIndexMap(AVG_DAILY_DRIVING_DURATIONN_COLUMN_NAME_INDEX_MAP);
+ resp.setSgColumns(Collections.emptyList());
+ resp.setColumns(AVG_DAILY_DRIVING_DURATION_HEADERS);
+ resp.setDataTypeList(AVG_DAILY_DRIVING_DURATION_HEADER_DATA_TYPES);
+ resp.setAliasColumns(AVG_DAILY_DRIVING_DURATIONN_ALIAS_COLUMNS);
+ resp.setIgnoreTimeStamp(false);
+ resp.setQueryId(queryId);
+ resp.setStatus(result.status);
+ Pair<List<ByteBuffer>, Boolean> pair =
+ QueryDataSetUtils.constructAvgDailyDrivingDurationResult(
+ queryExecution, serde, req.startTime, req.endTime);
+ finished = pair.right;
+ resp.setQueryResult(pair.left);
+ resp.setMoreData(!finished);
+ quota.addReadResult(resp.getQueryResult());
+ } else {
+ finished = true;
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+ return resp;
+ }
+ } catch (Exception e) {
+ finished = true;
+ t = e;
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + statement + "\". " +
OperationType.EXECUTE_STATEMENT));
+ } catch (Error error) {
+ t = error;
+ throw error;
+ } finally {
+ long currentOperationCost = System.nanoTime() - startTime;
+ COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+ // record each operation time cost
+ if (statementType != null) {
+ addStatementExecutionLatency(
+ OperationType.EXECUTE_QUERY_STATEMENT,
+ AVG_DAILY_DRIVING_DURATION.name(),
+ currentOperationCost);
+ }
+
+ if (finished) {
+ // record total time cost for one query
+ long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+ addQueryLatency(
+ AVG_DAILY_DRIVING_DURATION, executionTime > 0 ? executionTime :
currentOperationCost);
+ COORDINATOR.cleanupQueryExecution(queryId, req, t);
+ }
+ SESSION_MANAGER.updateIdleTime();
+ if (quota != null) {
+ quota.close();
+ }
+ }
+ }
+
+ @Override
+ public TSExecuteStatementResp
avgDailyDrivingSession(TSAvgDailyDrivingSessionReq req)
+ throws TException {
+ boolean finished = false;
+ long queryId = Long.MIN_VALUE;
+ String statement =
+ String.format(AVG_DAILY_DRIVING_SESSION_SQL_TEMPLATE, req.startTime,
req.endTime);
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ // quota
+ OperationQuota quota = null;
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+
+ long startTime = System.nanoTime();
+ StatementType statementType = null;
+ Throwable t = null;
+ try {
+ Statement s = StatementGenerator.createStatement(statement,
clientSession.getZoneId());
+
+ if (s == null) {
+ return RpcUtils.getTSExecuteStatementResp(
+ RpcUtils.getStatus(
+ TSStatusCode.SQL_PARSE_ERROR, "This operation type is not
supported"));
+ }
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(status);
+ }
+
+ quota =
+ DataNodeThrottleQuotaManager.getInstance()
+ .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+ statementType = s.getType();
+ if (ENABLE_AUDIT_LOG) {
+ AuditLogger.log(statement, s);
+ }
+
+ queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.executeForTreeModel(
+ s,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(clientSession),
+ statement,
+ partitionFetcher,
+ schemaFetcher,
+ req.getTimeout());
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+ try (SetThreadName threadName = new
SetThreadName(result.queryId.getId())) {
+ TSExecuteStatementResp resp;
+ if (queryExecution != null && queryExecution.isQuery()) {
+ resp =
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+
resp.setColumnNameIndexMap(AVG_DAILY_DRIVING_SESSION_COLUMN_NAME_INDEX_MAP);
+ resp.setSgColumns(Collections.emptyList());
+ resp.setColumns(AVG_DAILY_DRIVING_SESSION_HEADERS);
+ resp.setDataTypeList(AVG_DAILY_DRIVING_SESSION_HEADER_DATA_TYPES);
+ resp.setAliasColumns(AVG_DAILY_DRIVING_SESSION_ALIAS_COLUMNS);
+ resp.setIgnoreTimeStamp(false);
+ resp.setQueryId(queryId);
+ resp.setStatus(result.status);
+ Pair<List<ByteBuffer>, Boolean> pair =
+ QueryDataSetUtils.constructAvgDailyDrivingSessionResult(
+ queryExecution, serde, req.startTime, req.endTime);
+ finished = pair.right;
+ resp.setQueryResult(pair.left);
+ resp.setMoreData(!finished);
+ quota.addReadResult(resp.getQueryResult());
+ } else {
+ finished = true;
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+ return resp;
+ }
+ } catch (Exception e) {
+ finished = true;
+ t = e;
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + statement + "\". " +
OperationType.EXECUTE_STATEMENT));
+ } catch (Error error) {
+ t = error;
+ throw error;
+ } finally {
+ long currentOperationCost = System.nanoTime() - startTime;
+ COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+ // record each operation time cost
+ if (statementType != null) {
+ addStatementExecutionLatency(
+ OperationType.EXECUTE_QUERY_STATEMENT,
+ AVG_DAILY_DRIVING_SESSION.name(),
+ currentOperationCost);
+ }
+
+ if (finished) {
+ // record total time cost for one query
+ long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+ addQueryLatency(
+ AVG_DAILY_DRIVING_SESSION, executionTime > 0 ? executionTime :
currentOperationCost);
+ COORDINATOR.cleanupQueryExecution(queryId, req, t);
+ }
+ SESSION_MANAGER.updateIdleTime();
+ if (quota != null) {
+ quota.close();
+ }
+ }
+ }
+
+ @Override
+ public TSExecuteStatementResp avgLoad(TSAvgLoadReq req) throws TException {
+ boolean finished = false;
+ long queryId = Long.MIN_VALUE;
+ String statement = AVG_LOAD_SQL_TEMPLATE;
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ // quota
+ OperationQuota quota = null;
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+
+ long startTime = System.nanoTime();
+ StatementType statementType = null;
+ Throwable t = null;
+ try {
+ Statement s = StatementGenerator.createStatement(statement,
clientSession.getZoneId());
+
+ if (s == null) {
+ return RpcUtils.getTSExecuteStatementResp(
+ RpcUtils.getStatus(
+ TSStatusCode.SQL_PARSE_ERROR, "This operation type is not
supported"));
+ }
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(status);
+ }
+
+ quota =
+ DataNodeThrottleQuotaManager.getInstance()
+ .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+ statementType = s.getType();
+ if (ENABLE_AUDIT_LOG) {
+ AuditLogger.log(statement, s);
+ }
+
+ queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.executeForTreeModel(
+ s,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(clientSession),
+ statement,
+ partitionFetcher,
+ schemaFetcher,
+ req.getTimeout());
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+ try (SetThreadName threadName = new
SetThreadName(result.queryId.getId())) {
+ TSExecuteStatementResp resp;
+ if (queryExecution != null && queryExecution.isQuery()) {
+ resp =
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+ resp.setColumnNameIndexMap(AVG_LOAD_COLUMN_NAME_INDEX_MAP);
+ resp.setSgColumns(Collections.emptyList());
+ resp.setColumns(AVG_LOAD_HEADERS);
+ resp.setDataTypeList(AVG_LOAD_HEADER_DATA_TYPES);
+ resp.setAliasColumns(AVG_LOAD_ALIAS_COLUMNS);
+ resp.setIgnoreTimeStamp(true);
+ resp.setQueryId(queryId);
+ resp.setStatus(result.status);
+ Pair<List<ByteBuffer>, Boolean> pair =
+ QueryDataSetUtils.constructAvgLoadResult(
+ queryExecution, DEVICE_ATTRIBUTES_MAP, serde);
+ finished = pair.right;
+ resp.setQueryResult(pair.left);
+ resp.setMoreData(!finished);
+ quota.addReadResult(resp.getQueryResult());
+ } else {
+ finished = true;
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+ return resp;
+ }
+ } catch (Exception e) {
+ finished = true;
+ t = e;
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + statement + "\". " +
OperationType.EXECUTE_STATEMENT));
+ } catch (Error error) {
+ t = error;
+ throw error;
+ } finally {
+ long currentOperationCost = System.nanoTime() - startTime;
+ COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+ // record each operation time cost
+ if (statementType != null) {
+ addStatementExecutionLatency(
+ OperationType.EXECUTE_QUERY_STATEMENT, AVG_LOAD.name(),
currentOperationCost);
+ }
+
+ if (finished) {
+ // record total time cost for one query
+ long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+ addQueryLatency(AVG_LOAD, executionTime > 0 ? executionTime :
currentOperationCost);
+ COORDINATOR.cleanupQueryExecution(queryId, req, t);
+ }
+ SESSION_MANAGER.updateIdleTime();
+ if (quota != null) {
+ quota.close();
+ }
+ }
+ }
+
+ @Override
+ public TSExecuteStatementResp dailyActivity(TSDailyActivityReq req) throws
TException {
+ boolean finished = false;
+ long queryId = Long.MIN_VALUE;
+ String statement = String.format(DAILY_ACTIVITY_SQL_TEMPLATE,
req.startTime, req.endTime);
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ // quota
+ OperationQuota quota = null;
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+
+ long startTime = System.nanoTime();
+ StatementType statementType = null;
+ Throwable t = null;
+ try {
+ Statement s = StatementGenerator.createStatement(statement,
clientSession.getZoneId());
+
+ if (s == null) {
+ return RpcUtils.getTSExecuteStatementResp(
+ RpcUtils.getStatus(
+ TSStatusCode.SQL_PARSE_ERROR, "This operation type is not
supported"));
+ }
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(status);
+ }
+
+ quota =
+ DataNodeThrottleQuotaManager.getInstance()
+ .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+ statementType = s.getType();
+ if (ENABLE_AUDIT_LOG) {
+ AuditLogger.log(statement, s);
+ }
+
+ queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.executeForTreeModel(
+ s,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(clientSession),
+ statement,
+ partitionFetcher,
+ schemaFetcher,
+ req.getTimeout());
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+ try (SetThreadName threadName = new
SetThreadName(result.queryId.getId())) {
+ TSExecuteStatementResp resp;
+ if (queryExecution != null && queryExecution.isQuery()) {
+ resp =
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+ resp.setColumnNameIndexMap(DAILY_ACTIVITY_COLUMN_NAME_INDEX_MAP);
+ resp.setSgColumns(Collections.emptyList());
+ resp.setColumns(DAILY_ACTIVITY_HEADERS);
+ resp.setDataTypeList(DAILY_ACTIVITY_HEADER_DATA_TYPES);
+ resp.setAliasColumns(DAILY_ACTIVITY_ALIAS_COLUMNS);
+ resp.setIgnoreTimeStamp(false);
+ resp.setQueryId(queryId);
+ resp.setStatus(result.status);
+ Pair<List<ByteBuffer>, Boolean> pair =
+ QueryDataSetUtils.constructDailyActivityResult(queryExecution,
serde);
+ finished = pair.right;
+ resp.setQueryResult(pair.left);
+ resp.setMoreData(!finished);
+ quota.addReadResult(resp.getQueryResult());
+ } else {
+ finished = true;
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+ return resp;
+ }
+ } catch (Exception e) {
+ finished = true;
+ t = e;
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + statement + "\". " +
OperationType.EXECUTE_STATEMENT));
+ } catch (Error error) {
+ t = error;
+ throw error;
+ } finally {
+ long currentOperationCost = System.nanoTime() - startTime;
+ COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+ // record each operation time cost
+ if (statementType != null) {
+ addStatementExecutionLatency(
+ OperationType.EXECUTE_QUERY_STATEMENT, DAILY_ACTIVITY.name(),
currentOperationCost);
+ }
+
+ if (finished) {
+ // record total time cost for one query
+ long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+ addQueryLatency(DAILY_ACTIVITY, executionTime > 0 ? executionTime :
currentOperationCost);
+ COORDINATOR.cleanupQueryExecution(queryId, req, t);
+ }
+ SESSION_MANAGER.updateIdleTime();
+ if (quota != null) {
+ quota.close();
+ }
+ }
+ }
+
+ @Override
+ public TSExecuteStatementResp breakdownFrequency(TSBreakdownFrequencyReq
req) throws TException {
+ return null;
+ }
+
@Override
public long requestStatementId(long sessionId) {
return
SESSION_MANAGER.requestStatementId(SESSION_MANAGER.getCurrSession());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
index 88906ba83f4..817e427fb57 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
@@ -184,4 +184,14 @@ public enum StatementType {
SHOW_TOPICS,
SHOW_SUBSCRIPTIONS,
+
+ HIGH_LOAD,
+ LONG_DRIVING_SESSIONS,
+ LONG_DAILY_SESSIONS,
+ AVG_VS_PROJECTED_FUEL_CONSUMPTION,
+ AVG_DAILY_DRIVING_DURATION,
+ AVG_DAILY_DRIVING_SESSION,
+ AVG_LOAD,
+ DAILY_ACTIVITY,
+ BREAKDOWN_FREQUENCY,
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 365b7d85b2d..7dd872122f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -20,16 +20,28 @@
package org.apache.iotdb.db.utils;
import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl;
+import
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.AggrWindowIterator;
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.tsfile.read.common.block.column.LongColumn;
+import org.apache.tsfile.read.common.block.column.TimeColumn;
+import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.tsfile.read.common.block.column.TsBlockSerde;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.TimeDuration;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import java.io.ByteArrayOutputStream;
@@ -37,10 +49,29 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.AVG_DAILY_DRIVING_DURATION_DATA_TYPES;
+import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.AVG_DAILY_DRIVING_SESSION_DATA_TYPES;
+import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.AVG_LOAD_DATA_TYPES;
+import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.AVG_VS_PROJ_FUEL_CONSUMPTION_DATA_TYPES;
+import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.DAILY_ACTIVITY_DATA_TYPES;
+import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.DRIVER_LEVEL;
+import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.FLEET_LEVEL;
+import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.HIGH_LOAD_DATA_TYPES;
+import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.LONG_DRIVING_SESSIONS_DATA_TYPES;
+import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.MODEL_LEVEL;
+import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.NAME_LEVEL;
+import static
org.apache.iotdb.db.utils.TimestampPrecisionUtils.convertToCurrPrecision;
/** TimeValuePairUtils to convert between thrift format and TsFile format. */
public class QueryDataSetUtils {
@@ -843,4 +874,598 @@ public class QueryDataSetUtils {
}
values[columnIndex] = binaryValues;
}
+
+ public static Pair<List<ByteBuffer>, Boolean> constructHighLoadResult(
+ IQueryExecution queryExecution,
+ Map<String, ClientRPCServiceImpl.DeviceAttributes> deviceAttributesMap,
+ TsBlockSerde serde)
+ throws IoTDBException, IOException {
+ boolean finished;
+
+ TsBlockBuilder builder = new TsBlockBuilder(HIGH_LOAD_DATA_TYPES);
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ ColumnBuilder nameColumnBuilder = builder.getColumnBuilder(0);
+ ColumnBuilder driverColumnBuilder = builder.getColumnBuilder(1);
+ ColumnBuilder currentLoadColumnBuilder = builder.getColumnBuilder(2);
+ ColumnBuilder loadCapacityColumnBuilder = builder.getColumnBuilder(3);
+
+ while (true) {
+ Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+ if (!optionalTsBlock.isPresent()) {
+ finished = true;
+ break;
+ }
+ TsBlock tsBlock = optionalTsBlock.get();
+
+ if (!tsBlock.isEmpty()) {
+ BinaryColumn timeSeriesColumn = (BinaryColumn) tsBlock.getColumn(0);
+ BinaryColumn valueColumn = (BinaryColumn) tsBlock.getColumn(1);
+ int currentCount = tsBlock.getPositionCount();
+ for (int i = 0; i < currentCount; i++) {
+ String timeSeries =
timeSeriesColumn.getBinary(i).getStringValue(StandardCharsets.UTF_8);
+ String[] seriesArray = timeSeries.split("\\.");
+ String name = seriesArray[NAME_LEVEL];
+ String driver = seriesArray[DRIVER_LEVEL];
+ double value =
+
Double.parseDouble(valueColumn.getBinary(i).getStringValue(StandardCharsets.UTF_8));
+ double loadCapacity =
+ deviceAttributesMap.get(timeSeries.substring(0,
timeSeries.length() - 13))
+ .loadCapacity;
+
+ if (value >= 0.9 * loadCapacity) {
+ builder.declarePosition();
+ // time column
+ timeColumnBuilder.writeLong(tsBlock.getTimeByIndex(i));
+
+ // name column
+ nameColumnBuilder.writeBinary(new Binary(name,
StandardCharsets.UTF_8));
+
+ // driver column
+ driverColumnBuilder.writeBinary(new Binary(driver,
StandardCharsets.UTF_8));
+
+ // current_load column
+ currentLoadColumnBuilder.writeDouble(value);
+
+ // load_capacity column
+ loadCapacityColumnBuilder.writeDouble(loadCapacity);
+ }
+ }
+ }
+ }
+
+ return new
Pair<>(Collections.singletonList(serde.serialize(builder.build())), finished);
+ }
+
+ public static Pair<List<ByteBuffer>, Boolean>
constructLongDrivingSessionsResult(
+ IQueryExecution queryExecution, TsBlockSerde serde, int threshold)
+ throws IoTDBException, IOException {
+ boolean finished;
+
+ TsBlockBuilder builder = new
TsBlockBuilder(LONG_DRIVING_SESSIONS_DATA_TYPES);
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ ColumnBuilder nameColumnBuilder = builder.getColumnBuilder(0);
+ ColumnBuilder driverColumnBuilder = builder.getColumnBuilder(1);
+
+ Binary previousDevice = new Binary("", StandardCharsets.UTF_8);
+ int count = 0;
+ while (true) {
+ Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+ if (!optionalTsBlock.isPresent()) {
+ finished = true;
+ break;
+ }
+ TsBlock tsBlock = optionalTsBlock.get();
+
+ if (!tsBlock.isEmpty()) {
+ BinaryColumn deviceColumn = (BinaryColumn) tsBlock.getColumn(0);
+ int currentCount = tsBlock.getPositionCount();
+ for (int i = 0; i < currentCount; i++) {
+ Binary deviceId = deviceColumn.getBinary(i);
+ // TODO ty optimize Binary equals method
+ if (previousDevice.equals(deviceId)) {
+ count++;
+ } else {
+ // device changed, we need to decide whether to output previous
device
+ if (count > threshold) {
+ String[] seriesArray =
+
previousDevice.getStringValue(StandardCharsets.UTF_8).split("\\.");
+ String name = seriesArray[NAME_LEVEL];
+ String driver = seriesArray[DRIVER_LEVEL];
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(0);
+ // name column
+ nameColumnBuilder.writeBinary(new Binary(name,
StandardCharsets.UTF_8));
+ // driver column
+ driverColumnBuilder.writeBinary(new Binary(driver,
StandardCharsets.UTF_8));
+ }
+ previousDevice = deviceId;
+ count = 1;
+ }
+ }
+ }
+ }
+
+ // last device, we need to decide whether to output previous device
+ if (count > 22) {
+ String[] seriesArray =
previousDevice.getStringValue(StandardCharsets.UTF_8).split("\\.");
+ String name = seriesArray[NAME_LEVEL];
+ String driver = seriesArray[DRIVER_LEVEL];
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(0);
+ // name column
+ nameColumnBuilder.writeBinary(new Binary(name, StandardCharsets.UTF_8));
+ // driver column
+ driverColumnBuilder.writeBinary(new Binary(driver,
StandardCharsets.UTF_8));
+ }
+
+ return new
Pair<>(Collections.singletonList(serde.serialize(builder.build())), finished);
+ }
+
+ public static Pair<List<ByteBuffer>, Boolean>
constructAvgVsProjectedFuelConsumptionResult(
+ IQueryExecution queryExecution,
+ Map<String, ClientRPCServiceImpl.DeviceAttributes> deviceAttributesMap,
+ TsBlockSerde serde)
+ throws IoTDBException, IOException {
+ boolean finished;
+
+ Map<String, AvgIntermediateResult> map = new HashMap<>();
+ while (true) {
+ Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+ if (!optionalTsBlock.isPresent()) {
+ finished = true;
+ break;
+ }
+ TsBlock tsBlock = optionalTsBlock.get();
+
+ if (!tsBlock.isEmpty()) {
+ BinaryColumn deviceColumn = (BinaryColumn) tsBlock.getColumn(0);
+ DoubleColumn sumFuelColumn = (DoubleColumn) tsBlock.getColumn(1);
+ LongColumn countFuelColumn = (LongColumn) tsBlock.getColumn(2);
+
+ int currentCount = tsBlock.getPositionCount();
+ for (int i = 0; i < currentCount; i++) {
+ String deviceId =
deviceColumn.getBinary(i).getStringValue(StandardCharsets.UTF_8);
+ double nominalFuelConsumption =
deviceAttributesMap.get(deviceId).nominalFuelConsumption;
+ String[] seriesArray = deviceId.split("\\.");
+ String model = seriesArray[MODEL_LEVEL];
+ AvgIntermediateResult result =
+ map.computeIfAbsent(model, k -> new AvgIntermediateResult());
+ long count = countFuelColumn.getLong(i);
+ result.count += count;
+ result.fuelSum += sumFuelColumn.getDouble(i);
+ result.nominalFuelSum += (nominalFuelConsumption * count);
+ }
+ }
+ }
+
+ int size = map.size();
+ TsBlockBuilder builder = new TsBlockBuilder(size,
AVG_VS_PROJ_FUEL_CONSUMPTION_DATA_TYPES);
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ ColumnBuilder fuelColumnBuilder = builder.getColumnBuilder(0);
+ ColumnBuilder avgFuelConsumptionColumnBuilder =
builder.getColumnBuilder(1);
+ ColumnBuilder nomimalFuelConsumptionColumnBuilder =
builder.getColumnBuilder(2);
+
+ map.forEach(
+ (k, v) -> {
+ timeColumnBuilder.writeLong(0);
+ fuelColumnBuilder.writeBinary(new Binary(k, StandardCharsets.UTF_8));
+ avgFuelConsumptionColumnBuilder.writeDouble(v.fuelSum / v.count);
+ nomimalFuelConsumptionColumnBuilder.writeDouble(v.nominalFuelSum /
v.count);
+ });
+
+ builder.declarePositions(size);
+
+ return new
Pair<>(Collections.singletonList(serde.serialize(builder.build())), finished);
+ }
+
+ private static class AvgIntermediateResult {
+ long count = 0;
+ double fuelSum = 0.0d;
+
+ double nominalFuelSum = 0.0d;
+ }
+
+ public static Pair<List<ByteBuffer>, Boolean>
constructAvgDailyDrivingDurationResult(
+ IQueryExecution queryExecution, TsBlockSerde serde, long startTime, long
endTime)
+ throws IoTDBException, IOException {
+ boolean finished;
+
+ TsBlockBuilder builder = new
TsBlockBuilder(AVG_DAILY_DRIVING_DURATION_DATA_TYPES);
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ ColumnBuilder fleetColumnBuilder = builder.getColumnBuilder(0);
+ ColumnBuilder nameColumnBuilder = builder.getColumnBuilder(1);
+ ColumnBuilder driverColumnBuilder = builder.getColumnBuilder(2);
+ ColumnBuilder avgDailyHoursColumnBuilder = builder.getColumnBuilder(3);
+
+ Binary previousDevice = new Binary("", StandardCharsets.UTF_8);
+ Binary fleet = null;
+ Binary name = null;
+ Binary driver = null;
+
+ TimeDuration interval = new TimeDuration(0, convertToCurrPrecision(1,
TimeUnit.DAYS));
+ AggrWindowIterator timeRangeIterator =
+ new AggrWindowIterator(startTime, endTime, interval, interval, true,
true);
+ TimeRange currentTimeRange = timeRangeIterator.nextTimeRange();
+ int count = 0;
+ while (true) {
+ Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+ if (!optionalTsBlock.isPresent()) {
+ finished = true;
+ break;
+ }
+ TsBlock tsBlock = optionalTsBlock.get();
+
+ if (!tsBlock.isEmpty()) {
+ TimeColumn timeColumn = tsBlock.getTimeColumn();
+ BinaryColumn deviceColumn = (BinaryColumn) tsBlock.getColumn(0);
+ int currentCount = tsBlock.getPositionCount();
+ for (int i = 0; i < currentCount; i++) {
+ Binary deviceId = deviceColumn.getBinary(i);
+ long currentTime = timeColumn.getLong(i);
+ if (previousDevice.equals(deviceId)) {
+ // calculate current dat
+ if (currentTimeRange.contains(
+ currentTime)) { // current time range for current device is
not finished
+ count++;
+ } else {
+ // previous day's result is done
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(currentTimeRange.getMin());
+ fleetColumnBuilder.writeBinary(fleet);
+ nameColumnBuilder.writeBinary(name);
+ driverColumnBuilder.writeBinary(driver);
+ avgDailyHoursColumnBuilder.writeDouble(count / 6.0d);
+
+ // move time range to next one that contains the current time
+ currentTimeRange = timeRangeIterator.nextTimeRange();
+ while (!currentTimeRange.contains(currentTime)) {
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(currentTimeRange.getMin());
+ fleetColumnBuilder.writeBinary(fleet);
+ nameColumnBuilder.writeBinary(name);
+ driverColumnBuilder.writeBinary(driver);
+ avgDailyHoursColumnBuilder.writeDouble(0.0d);
+ currentTimeRange = timeRangeIterator.nextTimeRange();
+ }
+ count = 1;
+ }
+ } else { // device changed, we need to decide whether to output
previous device
+
+ if (fleet != null) {
+ // construct remaining data for previous device
+ do {
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(currentTimeRange.getMin());
+ fleetColumnBuilder.writeBinary(fleet);
+ nameColumnBuilder.writeBinary(name);
+ driverColumnBuilder.writeBinary(driver);
+ avgDailyHoursColumnBuilder.writeDouble(count / 6.0d);
+ currentTimeRange = timeRangeIterator.nextTimeRange();
+ count = 0;
+ } while (timeRangeIterator.hasNextTimeRange());
+ }
+
+ // reset
+ previousDevice = deviceId;
+ String[] splitArray =
deviceId.getStringValue(StandardCharsets.UTF_8).split("\\.");
+ fleet = new Binary(splitArray[FLEET_LEVEL],
StandardCharsets.UTF_8);
+ name = new Binary(splitArray[NAME_LEVEL], StandardCharsets.UTF_8);
+ driver = new Binary(splitArray[DRIVER_LEVEL],
StandardCharsets.UTF_8);
+ timeRangeIterator.reset();
+
+ // move time range to next one that contains the current time
+ currentTimeRange = timeRangeIterator.nextTimeRange();
+ while (!currentTimeRange.contains(currentTime)) {
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(currentTimeRange.getMin());
+ fleetColumnBuilder.writeBinary(fleet);
+ nameColumnBuilder.writeBinary(name);
+ driverColumnBuilder.writeBinary(driver);
+ avgDailyHoursColumnBuilder.writeDouble(0.0d);
+ currentTimeRange = timeRangeIterator.nextTimeRange();
+ }
+ count = 1;
+ }
+ }
+ }
+ }
+
+ // output last device
+ if (fleet != null) {
+ // construct remaining data for last device
+ do {
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(currentTimeRange.getMin());
+ fleetColumnBuilder.writeBinary(fleet);
+ nameColumnBuilder.writeBinary(name);
+ driverColumnBuilder.writeBinary(driver);
+ avgDailyHoursColumnBuilder.writeDouble(count / 6.0d);
+ currentTimeRange = timeRangeIterator.nextTimeRange();
+ count = 0;
+ } while (timeRangeIterator.hasNextTimeRange());
+ }
+
+ return new
Pair<>(Collections.singletonList(serde.serialize(builder.build())), finished);
+ }
+
+ public static Pair<List<ByteBuffer>, Boolean>
constructAvgDailyDrivingSessionResult(
+ IQueryExecution queryExecution, TsBlockSerde serde, long startTime, long
endTime)
+ throws IoTDBException, IOException {
+ boolean finished;
+
+ TsBlockBuilder builder = new
TsBlockBuilder(AVG_DAILY_DRIVING_SESSION_DATA_TYPES);
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ ColumnBuilder nameColumnBuilder = builder.getColumnBuilder(0);
+ ColumnBuilder durationColumnBuilder = builder.getColumnBuilder(1);
+
+ int numOfTenMinutesInOneDay = 144;
+ List<Double> velocityList = new ArrayList<>(numOfTenMinutesInOneDay);
+ List<Long> timeList = new ArrayList<>(numOfTenMinutesInOneDay);
+ long dayDuration = convertToCurrPrecision(1, TimeUnit.DAYS);
+ int day = 0;
+ while (true) {
+ Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+ if (!optionalTsBlock.isPresent()) {
+ finished = true;
+ break;
+ }
+ TsBlock tsBlock = optionalTsBlock.get();
+
+ if (!tsBlock.isEmpty()) {
+ TimeColumn timeColumn = tsBlock.getTimeColumn();
+ BinaryColumn deviceColumn = (BinaryColumn) tsBlock.getColumn(0);
+ DoubleColumn velocityColumn = (DoubleColumn) tsBlock.getColumn(1);
+ int currentCount = tsBlock.getPositionCount();
+ for (int i = 0; i < currentCount; i++) {
+ long currentTime = timeColumn.getLong(i);
+ double velocity = velocityColumn.getDouble(i);
+ timeList.add(currentTime);
+ velocityList.add(velocityColumn.isNull(i) ? null : velocity);
+
+ if (timeList.size() == numOfTenMinutesInOneDay) {
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(startTime + day * dayDuration);
+ nameColumnBuilder.writeBinary(
+ new Binary(
+ deviceColumn
+ .getBinary(i)
+ .getStringValue(StandardCharsets.UTF_8)
+ .split("\\.")[NAME_LEVEL],
+ StandardCharsets.UTF_8));
+
+ // calculate avg driving duration for one day
+ long durationSum = 0;
+ long durationCount = 0;
+ Long start = null;
+ for (int index = 0; index < numOfTenMinutesInOneDay; index++) {
+ Double v = velocityList.get(index);
+ if (v != null && v > 5) {
+ if (start == null) {
+ start = timeList.get(index);
+ }
+ } else {
+ if (start != null) {
+ // one driving duration
+ durationSum += (timeList.get(index) - start);
+ durationCount++;
+ start = null;
+ }
+ }
+ }
+
+ if (start != null) {
+ // one driving duration
+ durationSum += (startTime + (day + 1) * dayDuration - start);
+ durationCount++;
+ }
+ durationColumnBuilder.writeLong(durationSum / durationCount);
+
+ timeList.clear();
+ velocityList.clear();
+
+ day++;
+ // 10 days is done, move to next device
+ if (day == 10) {
+ day = 0;
+ }
+ }
+ }
+ }
+ }
+
+ return new
Pair<>(Collections.singletonList(serde.serialize(builder.build())), finished);
+ }
+
+ public static Pair<List<ByteBuffer>, Boolean> constructAvgLoadResult(
+ IQueryExecution queryExecution,
+ Map<String, ClientRPCServiceImpl.DeviceAttributes> deviceAttributesMap,
+ TsBlockSerde serde)
+ throws IoTDBException, IOException {
+ boolean finished;
+
+ Map<AvgLoadKey, AvgLoadIntermediateResult> map = new HashMap<>();
+
+ while (true) {
+ Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+ if (!optionalTsBlock.isPresent()) {
+ finished = true;
+ break;
+ }
+ TsBlock tsBlock = optionalTsBlock.get();
+
+ if (!tsBlock.isEmpty()) {
+ BinaryColumn deviceColumn = (BinaryColumn) tsBlock.getColumn(0);
+ DoubleColumn avgLoadColumn = (DoubleColumn) tsBlock.getColumn(1);
+ int currentCount = tsBlock.getPositionCount();
+ for (int i = 0; i < currentCount; i++) {
+ if (avgLoadColumn.isNull(i)) {
+ continue;
+ }
+ String deviceId =
deviceColumn.getBinary(i).getStringValue(StandardCharsets.UTF_8);
+ String[] deviceArray = deviceId.split("\\.");
+ String fleet = deviceArray[FLEET_LEVEL];
+ String model = deviceArray[MODEL_LEVEL];
+ double avgLoad = avgLoadColumn.getDouble(i);
+ double loadCapacity = deviceAttributesMap.get(deviceId).loadCapacity;
+ AvgLoadKey key = new AvgLoadKey(fleet, model, loadCapacity);
+ AvgLoadIntermediateResult intermediateResult =
+ map.computeIfAbsent(key, k -> new AvgLoadIntermediateResult());
+ intermediateResult.sum += (avgLoad / loadCapacity);
+ intermediateResult.count++;
+ }
+ }
+ }
+
+ int size = map.size();
+
+ TsBlockBuilder builder = new TsBlockBuilder(size, AVG_LOAD_DATA_TYPES);
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ ColumnBuilder fleetColumnBuilder = builder.getColumnBuilder(0);
+ ColumnBuilder modelColumnBuilder = builder.getColumnBuilder(1);
+ ColumnBuilder loadCapacityColumnBuilder = builder.getColumnBuilder(2);
+ ColumnBuilder avgLoadColumnBuilder = builder.getColumnBuilder(3);
+
+ map.forEach(
+ (k, v) -> {
+ timeColumnBuilder.writeLong(0L);
+ fleetColumnBuilder.writeBinary(new Binary(k.fleet,
StandardCharsets.UTF_8));
+ modelColumnBuilder.writeBinary(new Binary(k.model,
StandardCharsets.UTF_8));
+ loadCapacityColumnBuilder.writeDouble(k.loadCapacity);
+ avgLoadColumnBuilder.writeDouble(v.sum / v.count);
+ });
+
+ builder.declarePositions(size);
+
+ return new
Pair<>(Collections.singletonList(serde.serialize(builder.build())), finished);
+ }
+
+ private static class AvgLoadIntermediateResult {
+ long count = 0;
+
+ double sum = 0.0d;
+ }
+
+ private static class AvgLoadKey {
+ private final String fleet;
+
+ private final String model;
+
+ private final double loadCapacity;
+
+ public AvgLoadKey(String fleet, String model, double loadCapacity) {
+ this.fleet = fleet;
+ this.model = model;
+ this.loadCapacity = loadCapacity;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AvgLoadKey that = (AvgLoadKey) o;
+ return Double.compare(loadCapacity, that.loadCapacity) == 0
+ && Objects.equals(fleet, that.fleet)
+ && Objects.equals(model, that.model);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fleet, model, loadCapacity);
+ }
+ }
+
+ public static Pair<List<ByteBuffer>, Boolean> constructDailyActivityResult(
+ IQueryExecution queryExecution, TsBlockSerde serde) throws
IoTDBException, IOException {
+ boolean finished;
+
+ Map<DailyActivityKey, DailyActivityValue> map = new HashMap<>();
+
+ long dayDuration = convertToCurrPrecision(1, TimeUnit.DAYS);
+
+ while (true) {
+ Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+ if (!optionalTsBlock.isPresent()) {
+ finished = true;
+ break;
+ }
+ TsBlock tsBlock = optionalTsBlock.get();
+
+ if (!tsBlock.isEmpty()) {
+ TimeColumn timeColumn = tsBlock.getTimeColumn();
+ BinaryColumn deviceColumn = (BinaryColumn) tsBlock.getColumn(0);
+ int currentCount = tsBlock.getPositionCount();
+ for (int i = 0; i < currentCount; i++) {
+ String deviceId =
deviceColumn.getBinary(i).getStringValue(StandardCharsets.UTF_8);
+ String[] deviceArray = deviceId.split("\\.");
+ String fleet = deviceArray[FLEET_LEVEL];
+ String model = deviceArray[MODEL_LEVEL];
+ long day = timeColumn.getLong(i) / dayDuration * dayDuration;
+ DailyActivityKey key = new DailyActivityKey(fleet, model, day);
+ DailyActivityValue value = map.computeIfAbsent(key, k -> new
DailyActivityValue());
+ value.count++;
+ }
+ }
+ }
+
+ int size = map.size();
+
+ TsBlockBuilder builder = new TsBlockBuilder(size,
DAILY_ACTIVITY_DATA_TYPES);
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ ColumnBuilder fleetColumnBuilder = builder.getColumnBuilder(0);
+ ColumnBuilder modelColumnBuilder = builder.getColumnBuilder(1);
+ ColumnBuilder dailyActivityColumnBuilder = builder.getColumnBuilder(2);
+
+ map.forEach(
+ (k, v) -> {
+ timeColumnBuilder.writeLong(k.day);
+ fleetColumnBuilder.writeBinary(new Binary(k.fleet,
StandardCharsets.UTF_8));
+ modelColumnBuilder.writeBinary(new Binary(k.model,
StandardCharsets.UTF_8));
+ dailyActivityColumnBuilder.writeLong(v.count / 144);
+ });
+
+ builder.declarePositions(size);
+
+ return new
Pair<>(Collections.singletonList(serde.serialize(builder.build())), finished);
+ }
+
+ private static class DailyActivityKey {
+ private final String fleet;
+
+ private final String model;
+
+ private final long day;
+
+ public DailyActivityKey(String fleet, String model, long day) {
+ this.fleet = fleet;
+ this.model = model;
+ this.day = day;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DailyActivityKey that = (DailyActivityKey) o;
+ return day == that.day
+ && Objects.equals(fleet, that.fleet)
+ && Objects.equals(model, that.model);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fleet, model, day);
+ }
+ }
+
+ private static class DailyActivityValue {
+ long count = 0;
+ }
}
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index a99a8e599c9..c59bf4addf8 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -528,6 +528,85 @@ struct TSConnectionInfoResp {
1: required list<TSConnectionInfo> connectionInfoList
}
+struct TSHighLoadReq {
+ 1: required i64 sessionId
+ 2: required i64 statementId
+ 3: required string fleet
+ 4: required i32 fetchSize
+ 5: required i64 timeout
+}
+
+struct TSLongDrivingSessionsReq {
+ 1: required i64 sessionId
+ 2: required i64 statementId
+ 3: required string fleet
+ 4: required i64 startTime
+ 5: required i64 endTime
+ 6: required i32 fetchSize
+ 7: required i64 timeout
+}
+
+struct TSLongDailySessionsReq {
+ 1: required i64 sessionId
+ 2: required i64 statementId
+ 3: required string fleet
+ 4: required i64 startTime
+ 5: required i64 endTime
+ 6: required i32 fetchSize
+ 7: required i64 timeout
+}
+
+struct TSAvgVsProjectedFuelConsumptionReq {
+ 1: required i64 sessionId
+ 2: required i64 statementId
+ 3: required i32 fetchSize
+ 4: required i64 timeout
+}
+
+struct TSAvgDailyDrivingDurationReq {
+ 1: required i64 sessionId
+ 2: required i64 statementId
+ 3: required i64 startTime
+ 4: required i64 endTime
+ 5: required i32 fetchSize
+ 6: required i64 timeout
+}
+
+struct TSAvgDailyDrivingSessionReq {
+ 1: required i64 sessionId
+ 2: required i64 statementId
+ 3: required i64 startTime
+ 4: required i64 endTime
+ 5: required i32 fetchSize
+ 6: required i64 timeout
+}
+
+struct TSAvgLoadReq {
+ 1: required i64 sessionId
+ 2: required i64 statementId
+ 3: required i32 fetchSize
+ 4: required i64 timeout
+}
+
+struct TSDailyActivityReq {
+ 1: required i64 sessionId
+ 2: required i64 statementId
+ 3: required i64 startTime
+ 4: required i64 endTime
+ 5: required i32 fetchSize
+ 6: required i64 timeout
+}
+
+struct TSBreakdownFrequencyReq {
+ 1: required i64 sessionId
+ 2: required i64 statementId
+ 3: required i64 startTime
+ 4: required i64 endTime
+ 5: required i32 fetchSize
+ 6: required i64 timeout
+}
+
+
service IClientRPCService {
TSExecuteStatementResp executeQueryStatementV2(1:TSExecuteStatementReq req);
@@ -624,6 +703,24 @@ service IClientRPCService {
TSExecuteStatementResp executeAggregationQuery(1:TSAggregationQueryReq req);
+ TSExecuteStatementResp highLoad(1:TSHighLoadReq req);
+
+ TSExecuteStatementResp longDrivingSessions(1:TSLongDrivingSessionsReq req);
+
+ TSExecuteStatementResp longDailySessions(1:TSLongDailySessionsReq req);
+
+ TSExecuteStatementResp
avgVsProjectedFuelConsumption(1:TSAvgVsProjectedFuelConsumptionReq req);
+
+ TSExecuteStatementResp
avgDailyDrivingDuration(1:TSAvgDailyDrivingDurationReq req);
+
+ TSExecuteStatementResp avgDailyDrivingSession(1:TSAvgDailyDrivingSessionReq
req);
+
+ TSExecuteStatementResp avgLoad(1:TSAvgLoadReq req);
+
+ TSExecuteStatementResp dailyActivity(1:TSDailyActivityReq req);
+
+ TSExecuteStatementResp breakdownFrequency(1:TSBreakdownFrequencyReq req);
+
i64 requestStatementId(1:i64 sessionId);
common.TSStatus createSchemaTemplate(1:TSCreateSchemaTemplateReq req);