This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch caLastOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 89b4a2a57c28d8fbafc918ac3837e7fe95fe0c2e
Author: JackieTien97 <[email protected]>
AuthorDate: Tue May 16 19:34:22 2023 +0800

    Add RPC interface
---
 .../iotdb/db/mpp/execution/driver/Driver.java      |  21 ++-
 .../mpp/metric/TimeSeriesMetadataCacheMetrics.java |   4 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 187 +++++++++++++++++++++
 thrift/src/main/thrift/client.thrift               |  15 ++
 4 files changed, 217 insertions(+), 10 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index 2ce330e976..b764a501b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
 import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -36,7 +37,9 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -381,15 +384,17 @@ public abstract class Driver implements IDriver {
 
       sink.setNoMoreTsBlocks();
 
+      Map<String, Long> operatorType2TotalCost = new HashMap<>();
       // record operator execution statistics to metrics
-      //      List<OperatorContext> operatorContexts = 
driverContext.getOperatorContexts();
-      //      for (OperatorContext operatorContext : operatorContexts) {
-      //        String operatorType = operatorContext.getOperatorType();
-      //        QUERY_METRICS.recordOperatorExecutionCost(
-      //            operatorType, 
operatorContext.getTotalExecutionTimeInNanos());
-      //        QUERY_METRICS.recordOperatorExecutionCount(
-      //            operatorType, operatorContext.getNextCalledCount());
-      //      }
+      List<OperatorContext> operatorContexts = 
driverContext.getOperatorContexts();
+      for (OperatorContext operatorContext : operatorContexts) {
+        String operatorType = operatorContext.getOperatorType();
+        operatorType2TotalCost.merge(
+            operatorType, operatorContext.getTotalExecutionTimeInNanos(), 
Long::sum);
+      }
+      for (Map.Entry<String, Long> entry : operatorType2TotalCost.entrySet()) {
+        QUERY_METRICS.recordOperatorExecutionCost(entry.getKey(), 
entry.getValue());
+      }
     } catch (InterruptedException t) {
       // don't record the stack
       wasInterrupted = true;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/metric/TimeSeriesMetadataCacheMetrics.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/metric/TimeSeriesMetadataCacheMetrics.java
index 965ed5483e..7a0ef0e4b9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/metric/TimeSeriesMetadataCacheMetrics.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/metric/TimeSeriesMetadataCacheMetrics.java
@@ -43,14 +43,14 @@ public class TimeSeriesMetadataCacheMetrics implements 
IMetricSet {
         Metric.CACHE_HIT.toString(),
         MetricLevel.IMPORTANT,
         timeSeriesMetadataCache,
-        TimeSeriesMetadataCache::calculateTimeSeriesMetadataHitRatio,
+        h -> h.calculateTimeSeriesMetadataHitRatio() * 100,
         Tag.NAME.toString(),
         "timeSeriesMeta");
     metricService.createAutoGauge(
         Metric.CACHE_HIT.toString(),
         MetricLevel.IMPORTANT,
         timeSeriesMetadataCache,
-        TimeSeriesMetadataCache::calculateBloomFilterHitRatio,
+        h -> h.calculateBloomFilterHitRatio() * 100,
         Tag.NAME.toString(),
         "bloomFilter");
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 01bf923362..434acf1192 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -18,11 +18,15 @@
  */
 package org.apache.iotdb.db.service.thrift.impl;
 
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -32,8 +36,11 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.metadata.template.TemplateQueryType;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
@@ -91,6 +98,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
@@ -118,8 +126,12 @@ import 
org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
 import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
+import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.thrift.TException;
@@ -130,11 +142,13 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.iotdb.db.mpp.common.DataNodeEndPoints.isSameNode;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
 import static 
org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
@@ -158,6 +172,10 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
   private final ISchemaFetcher schemaFetcher;
 
+  private final TsBlockSerde serde = new TsBlockSerde();
+
+  private final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE = 
DataNodeSchemaCache.getInstance();
+
   @FunctionalInterface
   public interface SelectResult {
     boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, 
int fetchSize)
@@ -552,6 +570,175 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     return executeLastDataQueryInternal(req, SELECT_RESULT);
   }
 
+  @Override
+  public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2(
+      TSFastLastDataQueryForOneDeviceReq req) throws TException {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    OperationQuota quota = null;
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+    long startTime = System.currentTimeMillis();
+    long startNanoTime = System.nanoTime();
+    Throwable t = null;
+    try {
+      String db;
+      String deviceId;
+      PartialPath devicePath;
+
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+
+      if (req.isLegalPathNodes()) {
+        db = req.db;
+        deviceId = req.deviceId;
+        devicePath = new PartialPath(deviceId.split("\\."));
+      } else {
+        db = new PartialPath(req.db).getFullPath();
+        devicePath = new PartialPath(req.deviceId);
+        deviceId = devicePath.getFullPath();
+      }
+
+      DataPartitionQueryParam queryParam =
+          new DataPartitionQueryParam(deviceId, Collections.emptyList(), true, 
true);
+      DataPartition dataPartition =
+          partitionFetcher.getDataPartitionWithUnclosedTimeRange(
+              Collections.singletonMap(db, 
Collections.singletonList(queryParam)));
+      List<TRegionReplicaSet> regionReplicaSets =
+          dataPartition.getDataRegionReplicaSet(deviceId, 
Collections.emptyList());
+      // if the device's latest dataRegion's leader is current node, we can 
directly read from cache
+      if (!regionReplicaSets.isEmpty()
+          && isSameNode(
+              regionReplicaSets
+                  .get(regionReplicaSets.size() - 1)
+                  .dataNodeLocations
+                  .get(0)
+                  .mPPDataExchangeEndPoint)) {
+        int sensorNum = req.sensors.size();
+        TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(sensorNum);
+        boolean allCached = true;
+        for (String sensor : req.sensors) {
+          PartialPath fullPath;
+          if (req.isLegalPathNodes()) {
+            fullPath = devicePath.concatNode(sensor);
+          } else {
+            fullPath = devicePath.concatNode((new 
PartialPath(sensor)).getFullPath());
+          }
+          TimeValuePair timeValuePair = 
DATA_NODE_SCHEMA_CACHE.getLastCache(fullPath);
+          if (timeValuePair == null) {
+            allCached = false;
+            break;
+          } else if (timeValuePair.getValue() == null) {
+            // there is no data for this sensor
+          } else {
+            // we don't consider TTL
+            LastQueryUtil.appendLastValue(
+                builder,
+                timeValuePair.getTimestamp(),
+                new Binary(fullPath.getFullPath()),
+                timeValuePair.getValue().getStringValue(),
+                timeValuePair.getValue().getDataType().name());
+          }
+        }
+        // cache hit
+        if (allCached) {
+          TSExecuteStatementResp resp =
+              createResponse(DatasetHeaderFactory.getLastQueryHeader(), 
queryId);
+          resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, ""));
+          
resp.setQueryResult(Collections.singletonList(serde.serialize(builder.build())));
+          finished = true;
+          resp.setMoreData(true);
+          return resp;
+        }
+      }
+
+      // cache miss
+      Statement s = StatementGenerator.createStatement(convert(req), 
clientSession.getZoneId());
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      quota =
+          DataNodeThrottleQuotaManager.getInstance()
+              .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+
+      if (enableAuditLog) {
+        AuditLogger.log(String.format("Last Data Query: %s", req), s);
+      }
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              "",
+              partitionFetcher,
+              schemaFetcher,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException("error code: " + result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          finished = SELECT_RESULT.apply(resp, queryExecution, req.fetchSize);
+          resp.setMoreData(!finished);
+          quota.addReadResult(resp.getQueryResult());
+        } else {
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + req + "\". " + 
OperationType.EXECUTE_LAST_DATA_QUERY));
+    } catch (Error error) {
+      t = error;
+      throw error;
+    } finally {
+      COORDINATOR.recordExecutionTime(queryId, System.currentTimeMillis() - 
startTime);
+      if (finished) {
+        addStatementExecutionLatency(
+            OperationType.EXECUTE_LAST_DATA_QUERY,
+            StatementType.QUERY,
+            System.nanoTime() - startNanoTime,
+            TimeUnit.NANOSECONDS);
+        COORDINATOR.cleanupQueryExecution(queryId, t);
+      }
+      SESSION_MANAGER.updateIdleTime();
+      if (quota != null) {
+        quota.close();
+      }
+    }
+  }
+
+  private TSLastDataQueryReq convert(TSFastLastDataQueryForOneDeviceReq req) {
+    TSLastDataQueryReq res = new TSLastDataQueryReq();
+    List<String> paths = new ArrayList<>(req.sensors);
+    for (String sensor : req.sensors) {
+      paths.add(req.deviceId + "." + sensor);
+    }
+    TSLastDataQueryReq tsLastDataQueryReq =
+        new TSLastDataQueryReq(req.sessionId, paths, 0, req.statementId);
+    tsLastDataQueryReq.setFetchSize(req.fetchSize);
+    tsLastDataQueryReq.setEnableRedirectQuery(req.enableRedirectQuery);
+    tsLastDataQueryReq.setLegalPathNodes(req.legalPathNodes);
+    tsLastDataQueryReq.setTimeout(req.timeout);
+    return res;
+  }
+
   @Override
   public TSExecuteStatementResp 
executeAggregationQueryV2(TSAggregationQueryReq req) {
     return executeAggregationQueryInternal(req, SELECT_RESULT);
diff --git a/thrift/src/main/thrift/client.thrift 
b/thrift/src/main/thrift/client.thrift
index 6779bbf89c..6e0f3fcb99 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -338,6 +338,19 @@ struct TSLastDataQueryReq {
   9: optional bool legalPathNodes
 }
 
+struct TSFastLastDataQueryForOneDeviceReq {
+  1: required i64 sessionId
+  2: required string db
+  3: required string deviceId
+  4: required list<string> sensors
+  5: optional i32 fetchSize
+  6: required i64 statementId
+  7: optional bool enableRedirectQuery
+  8: optional bool jdbcQuery
+  9: optional i64 timeout
+  10: optional bool legalPathNodes
+}
+
 struct TSAggregationQueryReq {
   1: required i64 sessionId
   2: required i64 statementId
@@ -491,6 +504,8 @@ service IClientRPCService {
 
   TSExecuteStatementResp executeLastDataQueryV2(1:TSLastDataQueryReq req);
 
+  TSExecuteStatementResp 
executeFastLastDataQueryForOneDeviceV2(1:TSFastLastDataQueryForOneDeviceReq 
req);
+
   TSExecuteStatementResp executeAggregationQueryV2(1:TSAggregationQueryReq 
req);
 
   TSFetchResultsResp fetchResultsV2(1:TSFetchResultsReq req);

Reply via email to