This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch caLastOpt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 89b4a2a57c28d8fbafc918ac3837e7fe95fe0c2e Author: JackieTien97 <[email protected]> AuthorDate: Tue May 16 19:34:22 2023 +0800 Add RPC interface --- .../iotdb/db/mpp/execution/driver/Driver.java | 21 ++- .../mpp/metric/TimeSeriesMetadataCacheMetrics.java | 4 +- .../service/thrift/impl/ClientRPCServiceImpl.java | 187 +++++++++++++++++++++ thrift/src/main/thrift/client.thrift | 15 ++ 4 files changed, 217 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java index 2ce330e976..b764a501b3 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink; import org.apache.iotdb.db.mpp.execution.operator.Operator; +import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId; import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -36,7 +37,9 @@ import org.slf4j.LoggerFactory; import javax.annotation.concurrent.GuardedBy; import java.io.File; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -381,15 +384,17 @@ public abstract class Driver implements IDriver { sink.setNoMoreTsBlocks(); + Map<String, Long> operatorType2TotalCost = new HashMap<>(); // record operator execution statistics to metrics - // List<OperatorContext> operatorContexts = driverContext.getOperatorContexts(); - // for (OperatorContext operatorContext : operatorContexts) { - // String operatorType = operatorContext.getOperatorType(); - // QUERY_METRICS.recordOperatorExecutionCost( - // operatorType, operatorContext.getTotalExecutionTimeInNanos()); - // QUERY_METRICS.recordOperatorExecutionCount( - // operatorType, operatorContext.getNextCalledCount()); - // } + List<OperatorContext> operatorContexts = driverContext.getOperatorContexts(); + for (OperatorContext operatorContext : operatorContexts) { + String operatorType = operatorContext.getOperatorType(); + operatorType2TotalCost.merge( + operatorType, operatorContext.getTotalExecutionTimeInNanos(), Long::sum); + } + for (Map.Entry<String, Long> entry : operatorType2TotalCost.entrySet()) { + QUERY_METRICS.recordOperatorExecutionCost(entry.getKey(), entry.getValue()); + } } catch (InterruptedException t) { // don't record the stack wasInterrupted = true; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/metric/TimeSeriesMetadataCacheMetrics.java b/server/src/main/java/org/apache/iotdb/db/mpp/metric/TimeSeriesMetadataCacheMetrics.java index 965ed5483e..7a0ef0e4b9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/metric/TimeSeriesMetadataCacheMetrics.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/metric/TimeSeriesMetadataCacheMetrics.java @@ -43,14 +43,14 @@ public class TimeSeriesMetadataCacheMetrics implements IMetricSet { Metric.CACHE_HIT.toString(), MetricLevel.IMPORTANT, timeSeriesMetadataCache, - TimeSeriesMetadataCache::calculateTimeSeriesMetadataHitRatio, + h -> h.calculateTimeSeriesMetadataHitRatio() * 100, Tag.NAME.toString(), "timeSeriesMeta"); metricService.createAutoGauge( Metric.CACHE_HIT.toString(), MetricLevel.IMPORTANT, timeSeriesMetadataCache, - TimeSeriesMetadataCache::calculateBloomFilterHitRatio, + h -> h.calculateBloomFilterHitRatio() * 100, Tag.NAME.toString(), "bloomFilter"); } diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java index 01bf923362..434acf1192 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java @@ -18,11 +18,15 @@ */ package org.apache.iotdb.db.service.thrift.impl; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.partition.DataPartition; +import org.apache.iotdb.commons.partition.DataPartitionQueryParam; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; @@ -32,8 +36,11 @@ import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.OperationType; +import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache; import org.apache.iotdb.db.metadata.template.TemplateQueryType; import org.apache.iotdb.db.mpp.common.header.DatasetHeader; +import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory; +import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil; import org.apache.iotdb.db.mpp.plan.Coordinator; import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher; @@ -91,6 +98,7 @@ import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq; import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq; import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; +import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq; import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp; import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq; @@ -118,8 +126,12 @@ import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq; import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo; import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo; +import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde; +import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.thrift.TException; @@ -130,11 +142,13 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; +import static org.apache.iotdb.db.mpp.common.DataNodeEndPoints.isSameNode; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; @@ -158,6 +172,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private final ISchemaFetcher schemaFetcher; + private final TsBlockSerde serde = new TsBlockSerde(); + + private final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE = DataNodeSchemaCache.getInstance(); + @FunctionalInterface public interface SelectResult { boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize) @@ -552,6 +570,175 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return executeLastDataQueryInternal(req, SELECT_RESULT); } + @Override + public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2( + TSFastLastDataQueryForOneDeviceReq req) throws TException { + boolean finished = false; + long queryId = Long.MIN_VALUE; + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + OperationQuota quota = null; + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus()); + } + long startTime = System.currentTimeMillis(); + long startNanoTime = System.nanoTime(); + Throwable t = null; + try { + String db; + String deviceId; + PartialPath devicePath; + + queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); + + if (req.isLegalPathNodes()) { + db = req.db; + deviceId = req.deviceId; + devicePath = new PartialPath(deviceId.split("\\.")); + } else { + db = new PartialPath(req.db).getFullPath(); + devicePath = new PartialPath(req.deviceId); + deviceId = devicePath.getFullPath(); + } + + DataPartitionQueryParam queryParam = + new DataPartitionQueryParam(deviceId, Collections.emptyList(), true, true); + DataPartition dataPartition = + partitionFetcher.getDataPartitionWithUnclosedTimeRange( + Collections.singletonMap(db, Collections.singletonList(queryParam))); + List<TRegionReplicaSet> regionReplicaSets = + dataPartition.getDataRegionReplicaSet(deviceId, Collections.emptyList()); + // if the device's latest dataRegion's leader is current node, we can directly read from cache + if (!regionReplicaSets.isEmpty() + && isSameNode( + regionReplicaSets + .get(regionReplicaSets.size() - 1) + .dataNodeLocations + .get(0) + .mPPDataExchangeEndPoint)) { + int sensorNum = req.sensors.size(); + TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(sensorNum); + boolean allCached = true; + for (String sensor : req.sensors) { + PartialPath fullPath; + if (req.isLegalPathNodes()) { + fullPath = devicePath.concatNode(sensor); + } else { + fullPath = devicePath.concatNode((new PartialPath(sensor)).getFullPath()); + } + TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(fullPath); + if (timeValuePair == null) { + allCached = false; + break; + } else if (timeValuePair.getValue() == null) { + // there is no data for this sensor + } else { + // we don't consider TTL + LastQueryUtil.appendLastValue( + builder, + timeValuePair.getTimestamp(), + new Binary(fullPath.getFullPath()), + timeValuePair.getValue().getStringValue(), + timeValuePair.getValue().getDataType().name()); + } + } + // cache hit + if (allCached) { + TSExecuteStatementResp resp = + createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "")); + resp.setQueryResult(Collections.singletonList(serde.serialize(builder.build()))); + finished = true; + resp.setMoreData(true); + return resp; + } + } + + // cache miss + Statement s = StatementGenerator.createStatement(convert(req), clientSession.getZoneId()); + // permission check + TSStatus status = AuthorityChecker.checkAuthority(s, clientSession); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return RpcUtils.getTSExecuteStatementResp(status); + } + + quota = + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); + + if (enableAuditLog) { + AuditLogger.log(String.format("Last Data Query: %s", req), s); + } + // create and cache dataset + ExecutionResult result = + COORDINATOR.execute( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher, + req.getTimeout()); + + if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new RuntimeException("error code: " + result.status); + } + + IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); + + try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) { + TSExecuteStatementResp resp; + if (queryExecution.isQuery()) { + resp = createResponse(queryExecution.getDatasetHeader(), queryId); + resp.setStatus(result.status); + finished = SELECT_RESULT.apply(resp, queryExecution, req.fetchSize); + resp.setMoreData(!finished); + quota.addReadResult(resp.getQueryResult()); + } else { + resp = RpcUtils.getTSExecuteStatementResp(result.status); + } + return resp; + } + + } catch (Exception e) { + finished = true; + t = e; + return RpcUtils.getTSExecuteStatementResp( + onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); + } catch (Error error) { + t = error; + throw error; + } finally { + COORDINATOR.recordExecutionTime(queryId, System.currentTimeMillis() - startTime); + if (finished) { + addStatementExecutionLatency( + OperationType.EXECUTE_LAST_DATA_QUERY, + StatementType.QUERY, + System.nanoTime() - startNanoTime, + TimeUnit.NANOSECONDS); + COORDINATOR.cleanupQueryExecution(queryId, t); + } + SESSION_MANAGER.updateIdleTime(); + if (quota != null) { + quota.close(); + } + } + } + + private TSLastDataQueryReq convert(TSFastLastDataQueryForOneDeviceReq req) { + TSLastDataQueryReq res = new TSLastDataQueryReq(); + List<String> paths = new ArrayList<>(req.sensors); + for (String sensor : req.sensors) { + paths.add(req.deviceId + "." + sensor); + } + TSLastDataQueryReq tsLastDataQueryReq = + new TSLastDataQueryReq(req.sessionId, paths, 0, req.statementId); + tsLastDataQueryReq.setFetchSize(req.fetchSize); + tsLastDataQueryReq.setEnableRedirectQuery(req.enableRedirectQuery); + tsLastDataQueryReq.setLegalPathNodes(req.legalPathNodes); + tsLastDataQueryReq.setTimeout(req.timeout); + return res; + } + @Override public TSExecuteStatementResp executeAggregationQueryV2(TSAggregationQueryReq req) { return executeAggregationQueryInternal(req, SELECT_RESULT); diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift index 6779bbf89c..6e0f3fcb99 100644 --- a/thrift/src/main/thrift/client.thrift +++ b/thrift/src/main/thrift/client.thrift @@ -338,6 +338,19 @@ struct TSLastDataQueryReq { 9: optional bool legalPathNodes } +struct TSFastLastDataQueryForOneDeviceReq { + 1: required i64 sessionId + 2: required string db + 3: required string deviceId + 4: required list<string> sensors + 5: optional i32 fetchSize + 6: required i64 statementId + 7: optional bool enableRedirectQuery + 8: optional bool jdbcQuery + 9: optional i64 timeout + 10: optional bool legalPathNodes +} + struct TSAggregationQueryReq { 1: required i64 sessionId 2: required i64 statementId @@ -491,6 +504,8 @@ service IClientRPCService { TSExecuteStatementResp executeLastDataQueryV2(1:TSLastDataQueryReq req); + TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2(1:TSFastLastDataQueryForOneDeviceReq req); + TSExecuteStatementResp executeAggregationQueryV2(1:TSAggregationQueryReq req); TSFetchResultsResp fetchResultsV2(1:TSFetchResultsReq req);
