This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch tsbs/iot
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/tsbs/iot by this push:
     new 8469dc0abfd init
8469dc0abfd is described below

commit 8469dc0abfdfe8ee7f61e59ac62a831fb830763d
Author: JackieTien97 <[email protected]>
AuthorDate: Sun Apr 28 20:40:15 2024 +0800

    init
---
 .../protocol/thrift/impl/ClientRPCServiceImpl.java | 170 ++++++++++++++++++++-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   | 106 +++++++++++++
 2 files changed, 275 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 320e8fc7bfb..998c289b1a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -218,6 +218,7 @@ import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementType.AVG_D
 import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementType.AVG_DAILY_DRIVING_SESSION;
 import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementType.AVG_LOAD;
 import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementType.AVG_VS_PROJECTED_FUEL_CONSUMPTION;
+import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementType.BREAKDOWN_FREQUENCY;
 import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementType.DAILY_ACTIVITY;
 import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementType.HIGH_LOAD;
 import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementType.LONG_DRIVING_SESSIONS;
@@ -487,6 +488,37 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
   private static final List<Byte> DAILY_ACTIVITY_ALIAS_COLUMNS =
       new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
 
+  private static final String BREAKDOWN_FREQUENCY_SQL_TEMPLATE_1 =
+      "select count_time(*) from root.diagnostics.** GROUP BY([%d, %d), 10m) 
ALIGN BY DEVICE";
+
+  private static final String BREAKDOWN_FREQUENCY_SQL_TEMPLATE_2 =
+      "select count("
+          + STATUS_COLUMN_NAME
+          + ") from root.diagnostics.** where "
+          + STATUS_COLUMN_NAME
+          + " > 0 GROUP BY([%d, %d), 10m) ALIGN BY DEVICE";
+
+  private static final List<String> BREAKDOWN_FREQUENCY_HEADERS =
+      ImmutableList.of(MODEL_COLUMN_NAME, "breakdown_frequency");
+
+  private static final List<String> BREAKDOWN_FREQUENCY_HEADER_DATA_TYPES =
+      ImmutableList.of(TSDataType.TEXT.toString(), 
TSDataType.INT64.toString());
+
+  public static final List<TSDataType> BREAKDOWN_FREQUENCY_DATA_TYPES =
+      ImmutableList.of(TSDataType.TEXT, TSDataType.INT64);
+
+  private static final Map<String, Integer> 
BREAKDOWN_FREQUENCY_COLUMN_NAME_INDEX_MAP =
+      new HashMap<>();
+
+  static {
+    for (int i = 0; i < BREAKDOWN_FREQUENCY_HEADERS.size(); i++) {
+      
BREAKDOWN_FREQUENCY_COLUMN_NAME_INDEX_MAP.put(BREAKDOWN_FREQUENCY_HEADERS.get(i),
 i);
+    }
+  }
+
+  private static final List<Byte> BREAKDOWN_FREQUENCY_ALIAS_COLUMNS =
+      new ArrayList<>(Bytes.asList(new BitSet().toByteArray()));
+
   private static final Map<String, DeviceAttributes> DEVICE_ATTRIBUTES_MAP = 
new HashMap<>();
 
   public static class DeviceAttributes {
@@ -3382,7 +3414,143 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
   @Override
   public TSExecuteStatementResp breakdownFrequency(TSBreakdownFrequencyReq 
req) throws TException {
-    return null;
+    boolean finished = false;
+    long queryId1 = Long.MIN_VALUE;
+    long queryId2 = Long.MAX_VALUE;
+    String statement1 =
+        String.format(BREAKDOWN_FREQUENCY_SQL_TEMPLATE_1, req.startTime, 
req.endTime);
+    String statement2 =
+        String.format(BREAKDOWN_FREQUENCY_SQL_TEMPLATE_2, req.startTime, 
req.endTime);
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    // quota
+    OperationQuota quota = null;
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    StatementType statementType = null;
+    Throwable t = null;
+    try {
+      Statement s1 = StatementGenerator.createStatement(statement1, 
clientSession.getZoneId());
+      Statement s2 = StatementGenerator.createStatement(statement2, 
clientSession.getZoneId());
+
+      if (s1 == null || s2 == null) {
+        return RpcUtils.getTSExecuteStatementResp(
+            RpcUtils.getStatus(
+                TSStatusCode.SQL_PARSE_ERROR, "This operation type is not 
supported"));
+      }
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s1, clientSession);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      quota =
+          DataNodeThrottleQuotaManager.getInstance()
+              .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s1);
+      statementType = s1.getType();
+      if (ENABLE_AUDIT_LOG) {
+        AuditLogger.log(statement1, s1);
+      }
+
+      queryId1 = SESSION_MANAGER.requestQueryId(clientSession, 
req.statementId);
+      queryId2 = SESSION_MANAGER.requestQueryId(clientSession, 
req.statementId);
+
+      // create and cache dataset
+      ExecutionResult result1 =
+          COORDINATOR.executeForTreeModel(
+              s1,
+              queryId1,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              statement1,
+              partitionFetcher,
+              schemaFetcher,
+              req.getTimeout());
+
+      if (result1.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result1.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(result1.status);
+      }
+
+      ExecutionResult result2 =
+          COORDINATOR.executeForTreeModel(
+              s2,
+              queryId2,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              statement2,
+              partitionFetcher,
+              schemaFetcher,
+              req.getTimeout());
+
+      if (result2.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result2.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(result2.status);
+      }
+
+      IQueryExecution queryExecution1 = 
COORDINATOR.getQueryExecution(queryId1);
+      IQueryExecution queryExecution2 = 
COORDINATOR.getQueryExecution(queryId2);
+
+      try (SetThreadName threadName = new 
SetThreadName(result1.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution1 != null && queryExecution1.isQuery()) {
+          resp = 
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+          
resp.setColumnNameIndexMap(BREAKDOWN_FREQUENCY_COLUMN_NAME_INDEX_MAP);
+          resp.setSgColumns(Collections.emptyList());
+          resp.setColumns(BREAKDOWN_FREQUENCY_HEADERS);
+          resp.setDataTypeList(BREAKDOWN_FREQUENCY_HEADER_DATA_TYPES);
+          resp.setAliasColumns(BREAKDOWN_FREQUENCY_ALIAS_COLUMNS);
+          resp.setIgnoreTimeStamp(true);
+          resp.setQueryId(queryId1);
+          resp.setStatus(result1.status);
+          Pair<List<ByteBuffer>, Boolean> pair =
+              QueryDataSetUtils.constructBreakdownFrequencyResult(
+                  queryExecution1, queryExecution2, serde);
+          finished = pair.right;
+          resp.setQueryResult(pair.left);
+          resp.setMoreData(!finished);
+          quota.addReadResult(resp.getQueryResult());
+        } else {
+          finished = true;
+          resp = RpcUtils.getTSExecuteStatementResp(result1.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "breakdown frequency " + 
OperationType.EXECUTE_STATEMENT));
+    } catch (Error error) {
+      t = error;
+      throw error;
+    } finally {
+      long currentOperationCost = System.nanoTime() - startTime;
+      COORDINATOR.recordExecutionTime(queryId1, currentOperationCost);
+      COORDINATOR.recordExecutionTime(queryId2, currentOperationCost);
+
+      // record each operation time cost
+      if (statementType != null) {
+        addStatementExecutionLatency(
+            OperationType.EXECUTE_QUERY_STATEMENT,
+            BREAKDOWN_FREQUENCY.name(),
+            currentOperationCost);
+      }
+
+      if (finished) {
+        // record total time cost for one query
+        long executionTime = COORDINATOR.getTotalExecutionTime(queryId1);
+        addQueryLatency(
+            BREAKDOWN_FREQUENCY, executionTime > 0 ? executionTime : 
currentOperationCost);
+        COORDINATOR.cleanupQueryExecution(queryId1, req, t);
+        COORDINATOR.cleanupQueryExecution(queryId2, req, t);
+      }
+      SESSION_MANAGER.updateIdleTime();
+      if (quota != null) {
+        quota.close();
+      }
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 7dd872122f2..4b084af334b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -64,6 +64,7 @@ import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.AVG_
 import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.AVG_DAILY_DRIVING_SESSION_DATA_TYPES;
 import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.AVG_LOAD_DATA_TYPES;
 import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.AVG_VS_PROJ_FUEL_CONSUMPTION_DATA_TYPES;
+import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.BREAKDOWN_FREQUENCY_DATA_TYPES;
 import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.DAILY_ACTIVITY_DATA_TYPES;
 import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.DRIVER_LEVEL;
 import static 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.FLEET_LEVEL;
@@ -1468,4 +1469,109 @@ public class QueryDataSetUtils {
   private static class DailyActivityValue {
     long count = 0;
   }
+
+  public static Pair<List<ByteBuffer>, Boolean> 
constructBreakdownFrequencyResult(
+      IQueryExecution queryExecution1, IQueryExecution queryExecution2, 
TsBlockSerde serde)
+      throws IoTDBException, IOException {
+
+    Map<String, DailyActivityValue> map = new HashMap<>();
+
+    TsBlock totalTsBlock = null;
+    boolean totalFinished = false;
+    int totalIndex = 0;
+    TsBlock statusTsBlock = null;
+    boolean statusFinished = false;
+    int statusIndex = 0;
+    List<Boolean> avtiveList = new ArrayList<>(1440);
+
+    while (!totalFinished && !statusFinished) {
+      if (totalTsBlock == null || totalTsBlock.isEmpty()) {
+        Optional<TsBlock> optionalTsBlock = queryExecution1.getBatchResult();
+        if (!optionalTsBlock.isPresent()) {
+          totalFinished = true;
+        } else {
+          totalTsBlock = optionalTsBlock.get();
+        }
+      }
+
+      if (statusTsBlock == null || statusTsBlock.isEmpty()) {
+        Optional<TsBlock> optionalTsBlock = queryExecution2.getBatchResult();
+        if (!optionalTsBlock.isPresent()) {
+          statusFinished = true;
+        } else {
+          statusTsBlock = optionalTsBlock.get();
+        }
+      }
+
+      if (totalTsBlock != null
+          && !totalTsBlock.isEmpty()
+          && statusTsBlock != null
+          && !statusTsBlock.isEmpty()) {
+        BinaryColumn deviceColumn = (BinaryColumn) totalTsBlock.getColumn(0);
+        LongColumn totalColumn = (LongColumn) totalTsBlock.getColumn(1);
+        LongColumn statusColumn = (LongColumn) statusTsBlock.getColumn(1);
+        while (totalIndex < totalTsBlock.getPositionCount()
+            && statusIndex < statusTsBlock.getPositionCount()) {
+          long currentTotal = totalColumn.getLong(totalIndex);
+          long currentStatus = statusColumn.getLong(statusIndex);
+
+          avtiveList.add(currentStatus * 1.0d / currentTotal > 0.5);
+          if (avtiveList.size() == 1440) {
+            String deviceId =
+                
deviceColumn.getBinary(totalIndex).getStringValue(StandardCharsets.UTF_8);
+            String model = deviceId.split("\\.")[MODEL_LEVEL];
+            DailyActivityValue value = map.computeIfAbsent(model, k -> new 
DailyActivityValue());
+            boolean meetTrue = false;
+            for (boolean active : avtiveList) {
+              if (active) {
+                if (!meetTrue) {
+                  meetTrue = true;
+                }
+              } else {
+                if (meetTrue) {
+                  value.count++;
+                  meetTrue = false;
+                }
+              }
+            }
+
+            if (meetTrue) {
+              value.count++;
+            }
+            avtiveList.clear();
+          }
+          totalIndex++;
+          statusIndex++;
+        }
+
+        if (totalIndex == totalTsBlock.getPositionCount()) {
+          totalTsBlock = null;
+          totalIndex = 0;
+        }
+
+        if (statusIndex == statusTsBlock.getPositionCount()) {
+          statusTsBlock = null;
+          statusIndex = 0;
+        }
+      }
+    }
+
+    int size = map.size();
+
+    TsBlockBuilder builder = new TsBlockBuilder(size, 
BREAKDOWN_FREQUENCY_DATA_TYPES);
+    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+    ColumnBuilder modelColumnBuilder = builder.getColumnBuilder(0);
+    ColumnBuilder breakDownFrequencyColumnBuilder = 
builder.getColumnBuilder(1);
+
+    map.forEach(
+        (k, v) -> {
+          timeColumnBuilder.writeLong(0);
+          modelColumnBuilder.writeBinary(new Binary(k, 
StandardCharsets.UTF_8));
+          breakDownFrequencyColumnBuilder.writeLong(v.count);
+        });
+
+    builder.declarePositions(size);
+
+    return new 
Pair<>(Collections.singletonList(serde.serialize(builder.build())), true);
+  }
 }

Reply via email to