This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch tsbs/iot
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tsbs/iot by this push:
new 8469dc0abfd init
8469dc0abfd is described below
commit 8469dc0abfdfe8ee7f61e59ac62a831fb830763d
Author: JackieTien97 <[email protected]>
AuthorDate: Sun Apr 28 20:40:15 2024 +0800
init
---
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 170 ++++++++++++++++++++-
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 106 +++++++++++++
2 files changed, 275 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 320e8fc7bfb..998c289b1a2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -218,6 +218,7 @@ import static
org.apache.iotdb.db.queryengine.plan.statement.StatementType.AVG_D
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementType.AVG_DAILY_DRIVING_SESSION;
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementType.AVG_LOAD;
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementType.AVG_VS_PROJECTED_FUEL_CONSUMPTION;
+import static
org.apache.iotdb.db.queryengine.plan.statement.StatementType.BREAKDOWN_FREQUENCY;
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementType.DAILY_ACTIVITY;
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementType.HIGH_LOAD;
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementType.LONG_DRIVING_SESSIONS;
@@ -487,6 +488,37 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
private static final List<Byte> DAILY_ACTIVITY_ALIAS_COLUMNS =
new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+ private static final String BREAKDOWN_FREQUENCY_SQL_TEMPLATE_1 =
+ "select count_time(*) from root.diagnostics.** GROUP BY([%d, %d), 10m)
ALIGN BY DEVICE";
+
+ private static final String BREAKDOWN_FREQUENCY_SQL_TEMPLATE_2 =
+ "select count("
+ + STATUS_COLUMN_NAME
+ + ") from root.diagnostics.** where "
+ + STATUS_COLUMN_NAME
+ + " > 0 GROUP BY([%d, %d), 10m) ALIGN BY DEVICE";
+
+ private static final List<String> BREAKDOWN_FREQUENCY_HEADERS =
+ ImmutableList.of(MODEL_COLUMN_NAME, "breakdown_frequency");
+
+ private static final List<String> BREAKDOWN_FREQUENCY_HEADER_DATA_TYPES =
+ ImmutableList.of(TSDataType.TEXT.toString(),
TSDataType.INT64.toString());
+
+ public static final List<TSDataType> BREAKDOWN_FREQUENCY_DATA_TYPES =
+ ImmutableList.of(TSDataType.TEXT, TSDataType.INT64);
+
+ private static final Map<String, Integer>
BREAKDOWN_FREQUENCY_COLUMN_NAME_INDEX_MAP =
+ new HashMap<>();
+
+ static {
+ for (int i = 0; i < BREAKDOWN_FREQUENCY_HEADERS.size(); i++) {
+
BREAKDOWN_FREQUENCY_COLUMN_NAME_INDEX_MAP.put(BREAKDOWN_FREQUENCY_HEADERS.get(i),
i);
+ }
+ }
+
+ private static final List<Byte> BREAKDOWN_FREQUENCY_ALIAS_COLUMNS =
+ new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+
private static final Map<String, DeviceAttributes> DEVICE_ATTRIBUTES_MAP =
new HashMap<>();
public static class DeviceAttributes {
@@ -3382,7 +3414,143 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
@Override
public TSExecuteStatementResp breakdownFrequency(TSBreakdownFrequencyReq
req) throws TException {
- return null;
+ boolean finished = false;
+ long queryId1 = Long.MIN_VALUE;
+ long queryId2 = Long.MAX_VALUE;
+ String statement1 =
+ String.format(BREAKDOWN_FREQUENCY_SQL_TEMPLATE_1, req.startTime,
req.endTime);
+ String statement2 =
+ String.format(BREAKDOWN_FREQUENCY_SQL_TEMPLATE_2, req.startTime,
req.endTime);
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ // quota
+ OperationQuota quota = null;
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+
+ long startTime = System.nanoTime();
+ StatementType statementType = null;
+ Throwable t = null;
+ try {
+ Statement s1 = StatementGenerator.createStatement(statement1,
clientSession.getZoneId());
+ Statement s2 = StatementGenerator.createStatement(statement2,
clientSession.getZoneId());
+
+ if (s1 == null || s2 == null) {
+ return RpcUtils.getTSExecuteStatementResp(
+ RpcUtils.getStatus(
+ TSStatusCode.SQL_PARSE_ERROR, "This operation type is not
supported"));
+ }
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s1, clientSession);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(status);
+ }
+
+ quota =
+ DataNodeThrottleQuotaManager.getInstance()
+ .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s1);
+ statementType = s1.getType();
+ if (ENABLE_AUDIT_LOG) {
+ AuditLogger.log(statement1, s1);
+ }
+
+ queryId1 = SESSION_MANAGER.requestQueryId(clientSession,
req.statementId);
+ queryId2 = SESSION_MANAGER.requestQueryId(clientSession,
req.statementId);
+
+ // create and cache dataset
+ ExecutionResult result1 =
+ COORDINATOR.executeForTreeModel(
+ s1,
+ queryId1,
+ SESSION_MANAGER.getSessionInfo(clientSession),
+ statement1,
+ partitionFetcher,
+ schemaFetcher,
+ req.getTimeout());
+
+ if (result1.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result1.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(result1.status);
+ }
+
+ ExecutionResult result2 =
+ COORDINATOR.executeForTreeModel(
+ s2,
+ queryId2,
+ SESSION_MANAGER.getSessionInfo(clientSession),
+ statement2,
+ partitionFetcher,
+ schemaFetcher,
+ req.getTimeout());
+
+ if (result2.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result2.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(result2.status);
+ }
+
+ IQueryExecution queryExecution1 =
COORDINATOR.getQueryExecution(queryId1);
+ IQueryExecution queryExecution2 =
COORDINATOR.getQueryExecution(queryId2);
+
+ try (SetThreadName threadName = new
SetThreadName(result1.queryId.getId())) {
+ TSExecuteStatementResp resp;
+ if (queryExecution1 != null && queryExecution1.isQuery()) {
+ resp =
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+
resp.setColumnNameIndexMap(BREAKDOWN_FREQUENCY_COLUMN_NAME_INDEX_MAP);
+ resp.setSgColumns(Collections.emptyList());
+ resp.setColumns(BREAKDOWN_FREQUENCY_HEADERS);
+ resp.setDataTypeList(BREAKDOWN_FREQUENCY_HEADER_DATA_TYPES);
+ resp.setAliasColumns(BREAKDOWN_FREQUENCY_ALIAS_COLUMNS);
+ resp.setIgnoreTimeStamp(true);
+ resp.setQueryId(queryId1);
+ resp.setStatus(result1.status);
+ Pair<List<ByteBuffer>, Boolean> pair =
+ QueryDataSetUtils.constructBreakdownFrequencyResult(
+ queryExecution1, queryExecution2, serde);
+ finished = pair.right;
+ resp.setQueryResult(pair.left);
+ resp.setMoreData(!finished);
+ quota.addReadResult(resp.getQueryResult());
+ } else {
+ finished = true;
+ resp = RpcUtils.getTSExecuteStatementResp(result1.status);
+ }
+ return resp;
+ }
+ } catch (Exception e) {
+ finished = true;
+ t = e;
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "breakdown frequency " +
OperationType.EXECUTE_STATEMENT));
+ } catch (Error error) {
+ t = error;
+ throw error;
+ } finally {
+ long currentOperationCost = System.nanoTime() - startTime;
+ COORDINATOR.recordExecutionTime(queryId1, currentOperationCost);
+ COORDINATOR.recordExecutionTime(queryId2, currentOperationCost);
+
+ // record each operation time cost
+ if (statementType != null) {
+ addStatementExecutionLatency(
+ OperationType.EXECUTE_QUERY_STATEMENT,
+ BREAKDOWN_FREQUENCY.name(),
+ currentOperationCost);
+ }
+
+ if (finished) {
+ // record total time cost for one query
+ long executionTime = COORDINATOR.getTotalExecutionTime(queryId1);
+ addQueryLatency(
+ BREAKDOWN_FREQUENCY, executionTime > 0 ? executionTime :
currentOperationCost);
+ COORDINATOR.cleanupQueryExecution(queryId1, req, t);
+ COORDINATOR.cleanupQueryExecution(queryId2, req, t);
+ }
+ SESSION_MANAGER.updateIdleTime();
+ if (quota != null) {
+ quota.close();
+ }
+ }
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 7dd872122f2..4b084af334b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -64,6 +64,7 @@ import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.AVG_
import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.AVG_DAILY_DRIVING_SESSION_DATA_TYPES;
import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.AVG_LOAD_DATA_TYPES;
import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.AVG_VS_PROJ_FUEL_CONSUMPTION_DATA_TYPES;
+import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.BREAKDOWN_FREQUENCY_DATA_TYPES;
import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.DAILY_ACTIVITY_DATA_TYPES;
import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.DRIVER_LEVEL;
import static
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.FLEET_LEVEL;
@@ -1468,4 +1469,109 @@ public class QueryDataSetUtils {
private static class DailyActivityValue {
long count = 0;
}
+
+ public static Pair<List<ByteBuffer>, Boolean>
constructBreakdownFrequencyResult(
+ IQueryExecution queryExecution1, IQueryExecution queryExecution2,
TsBlockSerde serde)
+ throws IoTDBException, IOException {
+
+ Map<String, DailyActivityValue> map = new HashMap<>();
+
+ TsBlock totalTsBlock = null;
+ boolean totalFinished = false;
+ int totalIndex = 0;
+ TsBlock statusTsBlock = null;
+ boolean statusFinished = false;
+ int statusIndex = 0;
+ List<Boolean> avtiveList = new ArrayList<>(1440);
+
+ while (!totalFinished && !statusFinished) {
+ if (totalTsBlock == null || totalTsBlock.isEmpty()) {
+ Optional<TsBlock> optionalTsBlock = queryExecution1.getBatchResult();
+ if (!optionalTsBlock.isPresent()) {
+ totalFinished = true;
+ } else {
+ totalTsBlock = optionalTsBlock.get();
+ }
+ }
+
+ if (statusTsBlock == null || statusTsBlock.isEmpty()) {
+ Optional<TsBlock> optionalTsBlock = queryExecution2.getBatchResult();
+ if (!optionalTsBlock.isPresent()) {
+ statusFinished = true;
+ } else {
+ statusTsBlock = optionalTsBlock.get();
+ }
+ }
+
+ if (totalTsBlock != null
+ && !totalTsBlock.isEmpty()
+ && statusTsBlock != null
+ && !statusTsBlock.isEmpty()) {
+ BinaryColumn deviceColumn = (BinaryColumn) totalTsBlock.getColumn(0);
+ LongColumn totalColumn = (LongColumn) totalTsBlock.getColumn(1);
+ LongColumn statusColumn = (LongColumn) statusTsBlock.getColumn(1);
+ while (totalIndex < totalTsBlock.getPositionCount()
+ && statusIndex < statusTsBlock.getPositionCount()) {
+ long currentTotal = totalColumn.getLong(totalIndex);
+ long currentStatus = statusColumn.getLong(statusIndex);
+
+ avtiveList.add(currentStatus * 1.0d / currentTotal > 0.5);
+ if (avtiveList.size() == 1440) {
+ String deviceId =
+
deviceColumn.getBinary(totalIndex).getStringValue(StandardCharsets.UTF_8);
+ String model = deviceId.split("\\.")[MODEL_LEVEL];
+ DailyActivityValue value = map.computeIfAbsent(model, k -> new
DailyActivityValue());
+ boolean meetTrue = false;
+ for (boolean active : avtiveList) {
+ if (active) {
+ if (!meetTrue) {
+ meetTrue = true;
+ }
+ } else {
+ if (meetTrue) {
+ value.count++;
+ meetTrue = false;
+ }
+ }
+ }
+
+ if (meetTrue) {
+ value.count++;
+ }
+ avtiveList.clear();
+ }
+ totalIndex++;
+ statusIndex++;
+ }
+
+ if (totalIndex == totalTsBlock.getPositionCount()) {
+ totalTsBlock = null;
+ totalIndex = 0;
+ }
+
+ if (statusIndex == statusTsBlock.getPositionCount()) {
+ statusTsBlock = null;
+ statusIndex = 0;
+ }
+ }
+ }
+
+ int size = map.size();
+
+ TsBlockBuilder builder = new TsBlockBuilder(size,
BREAKDOWN_FREQUENCY_DATA_TYPES);
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ ColumnBuilder modelColumnBuilder = builder.getColumnBuilder(0);
+ ColumnBuilder breakDownFrequencyColumnBuilder =
builder.getColumnBuilder(1);
+
+ map.forEach(
+ (k, v) -> {
+ timeColumnBuilder.writeLong(0);
+ modelColumnBuilder.writeBinary(new Binary(k,
StandardCharsets.UTF_8));
+ breakDownFrequencyColumnBuilder.writeLong(v.count);
+ });
+
+ builder.declarePositions(size);
+
+ return new
Pair<>(Collections.singletonList(serde.serialize(builder.build())), true);
+ }
}