This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b5b7b70f6be Implemented fast last query for tree model with prefix
path (#15678)
b5b7b70f6be is described below
commit b5b7b70f6be17186c85039b2304f0b9f5845c338
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 10 16:24:49 2025 +0800
Implemented fast last query for tree model with prefix path (#15678)
* Fast last query on local
* Remove debug settings 2
* Bug fix
* Update SessionExample.java
* Update SessionConnection.java
* Fix
* logger
* Update DualKeyCacheImpl.java
* Update DualKeyCacheImpl.java
---
.../main/java/org/apache/iotdb/SessionExample.java | 18 ++++-
.../java/org/apache/iotdb/isession/ISession.java | 4 +
.../apache/iotdb/isession/pool/ISessionPool.java | 4 +
.../java/org/apache/iotdb/session/Session.java | 6 ++
.../apache/iotdb/session/SessionConnection.java | 41 ++++++++++
.../org/apache/iotdb/session/pool/SessionPool.java | 27 +++++++
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 94 ++++++++++++++++++++++
.../cache/schema/dualkeycache/IDualKeyCache.java | 5 ++
.../schema/dualkeycache/impl/DualKeyCacheImpl.java | 21 +++++
.../fetcher/cache/TableDeviceCacheEntry.java | 13 +++
.../fetcher/cache/TableDeviceSchemaCache.java | 6 ++
.../relational/metadata/fetcher/cache/TableId.java | 4 +-
.../schemaengine/schemaregion/ISchemaRegion.java | 9 +++
.../schemaregion/impl/SchemaRegionMemoryImpl.java | 64 ++++++++-------
.../schemaregion/impl/SchemaRegionPBTreeImpl.java | 10 +++
.../mtree/impl/mem/MTreeBelowSGMemoryImpl.java | 34 ++++++++
.../thrift-datanode/src/main/thrift/client.thrift | 12 +++
17 files changed, 341 insertions(+), 31 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index e894c08406f..9d4c1f700ee 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.isession.SessionDataSet.DataIterator;
import org.apache.iotdb.isession.template.Template;
import org.apache.iotdb.isession.util.Version;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.Session;
@@ -66,7 +67,7 @@ public class SessionExample {
private static Random random = new Random();
public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
session =
new Session.Builder()
.host(LOCAL_HOST)
@@ -119,6 +120,7 @@ public class SessionExample {
sessionEnableRedirect.setFetchSize(10000);
fastLastDataQueryForOneDevice();
+ fastLastDataQueryForOnePrefix();
insertRecord4Redirect();
query4Redirect();
sessionEnableRedirect.close();
@@ -715,6 +717,20 @@ public class SessionExample {
}
}
+ private static void fastLastDataQueryForOnePrefix()
+ throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
+ System.out.println("-------fastLastQueryForOnePrefix------");
+ try (SessionDataSet sessionDataSet =
+ sessionEnableRedirect.executeFastLastDataQueryForOnePrefixPath(
+ Arrays.asList("root", "sg1"))) {
+ System.out.println(sessionDataSet.getColumnNames());
+ sessionDataSet.setFetchSize(1024);
+ while (sessionDataSet.hasNext()) {
+ System.out.println(sessionDataSet.next());
+ }
+ }
+ }
+
private static void aggregationQuery()
throws IoTDBConnectionException, StatementExecutionException {
List<String> paths = new ArrayList<>();
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 09b1444b4fc..3d6cc6f3754 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.isession.template.Template;
import org.apache.iotdb.isession.util.SystemStatus;
import org.apache.iotdb.isession.util.Version;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
@@ -185,6 +186,9 @@ public interface ISession extends AutoCloseable {
SessionDataSet executeLastDataQuery(List<String> paths)
throws StatementExecutionException, IoTDBConnectionException;
+ SessionDataSet executeFastLastDataQueryForOnePrefixPath(final List<String>
prefixes)
+ throws IoTDBConnectionException, StatementExecutionException,
RedirectException;
+
SessionDataSet executeLastDataQueryForOneDevice(
String db, String device, List<String> sensors, boolean isLegalPathNodes)
throws StatementExecutionException, IoTDBConnectionException;
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
index eeb6509e89d..1193435889d 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
+++
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.isession.template.Template;
import org.apache.iotdb.isession.util.SystemStatus;
import org.apache.iotdb.isession.util.Version;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
@@ -486,6 +487,9 @@ public interface ISessionPool {
String db, String device, List<String> sensors, boolean isLegalPathNodes)
throws StatementExecutionException, IoTDBConnectionException;
+ SessionDataSetWrapper executeFastLastDataQueryForOnePrefixPath(final
List<String> prefixes)
+ throws IoTDBConnectionException, StatementExecutionException,
RedirectException;
+
SessionDataSetWrapper executeAggregationQuery(
List<String> paths, List<TAggregationType> aggregations)
throws StatementExecutionException, IoTDBConnectionException;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 8283f2319a1..95f6e2df65e 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -1105,6 +1105,12 @@ public class Session implements ISession {
return executeLastDataQuery(paths, time, queryTimeoutInMs);
}
+ @Override
+ public SessionDataSet executeFastLastDataQueryForOnePrefixPath(final
List<String> prefixes)
+ throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
+ return
defaultSessionConnection.executeLastDataQueryForOnePrefixPath(prefixes);
+ }
+
@Override
public SessionDataSet executeLastDataQueryForOneDevice(
String db, String device, List<String> sensors, boolean isLegalPathNodes)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index cfef7bfb6f7..68947283ce2 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -46,6 +46,7 @@ import
org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq;
+import
org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOnePrefixPathReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -495,6 +496,46 @@ public class SessionConnection {
execResp.getColumnIndex2TsBlockColumnIndexList());
}
+ protected SessionDataSet executeLastDataQueryForOnePrefixPath(final
List<String> prefixes)
+ throws StatementExecutionException, IoTDBConnectionException,
RedirectException {
+ TSFastLastDataQueryForOnePrefixPathReq req =
+ new TSFastLastDataQueryForOnePrefixPathReq(sessionId, prefixes,
statementId);
+ req.setFetchSize(session.fetchSize);
+ req.setEnableRedirectQuery(enableRedirect);
+
+ RetryResult<TSExecuteStatementResp> result =
+ callWithReconnect(
+ () -> {
+ req.setSessionId(sessionId);
+ req.setStatementId(statementId);
+ return client.executeFastLastDataQueryForOnePrefixPath(req);
+ });
+ final TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
+
+ if (result.getRetryAttempts() == 0) {
+
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
+ } else {
+ RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
+ }
+
+ return new SessionDataSet(
+ "",
+ tsExecuteStatementResp.getColumns(),
+ tsExecuteStatementResp.getDataTypeList(),
+ tsExecuteStatementResp.columnNameIndexMap,
+ tsExecuteStatementResp.getQueryId(),
+ statementId,
+ client,
+ sessionId,
+ tsExecuteStatementResp.queryResult,
+ tsExecuteStatementResp.isIgnoreTimeStamp(),
+ tsExecuteStatementResp.moreData,
+ zoneId,
+ timeFactor,
+ false,
+ tsExecuteStatementResp.getColumnIndex2TsBlockColumnIndexList());
+ }
+
protected Pair<SessionDataSet, TEndPoint> executeLastDataQueryForOneDevice(
String db, String device, List<String> sensors, boolean
isLegalPathNodes, long timeOut)
throws StatementExecutionException, IoTDBConnectionException {
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 3eefa646546..1ed3d7a8531 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -3214,6 +3214,33 @@ public class SessionPool implements ISessionPool {
return null;
}
+ @Override
+ public SessionDataSetWrapper executeFastLastDataQueryForOnePrefixPath(final
List<String> prefixes)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ ISession session = getSession();
+ try {
+ SessionDataSet resp =
session.executeFastLastDataQueryForOnePrefixPath(prefixes);
+ SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp,
session, this);
+ occupy(session);
+ return wrapper;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("executeLastDataQuery failed", e);
+ cleanSessionAndMayThrowConnectionException(session, i, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error(EXECUTE_LASTDATAQUERY_ERROR, e);
+ putBack(session);
+ throw new RuntimeException(e);
+ }
+ }
+ // never go here
+ return null;
+ }
+
@Override
public SessionDataSetWrapper executeLastDataQueryForOneDevice(
String db, String device, List<String> sensors, boolean isLegalPathNodes)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index cd23cb4e3d0..7c11bb54b38 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -89,6 +89,9 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimePa
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableId;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SetSqlDialect;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use;
@@ -116,6 +119,8 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.SetSchem
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateTableViewStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.SetSqlDialectStatement;
+import org.apache.iotdb.db.schemaengine.SchemaEngine;
+import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.schemaengine.template.TemplateQueryType;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -151,6 +156,7 @@ import
org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq;
+import
org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOnePrefixPathReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
@@ -187,6 +193,7 @@ import org.apache.thrift.TException;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
@@ -926,6 +933,93 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
return executeLastDataQueryInternal(req, SELECT_RESULT);
}
+ @Override
+ public TSExecuteStatementResp executeFastLastDataQueryForOnePrefixPath(
+ final TSFastLastDataQueryForOnePrefixPathReq req) {
+ final IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+
+ try {
+ final long queryId = SESSION_MANAGER.requestQueryId(clientSession,
req.statementId);
+ // 1. Map<Device, String[] measurements>
ISchemaFetcher.getAllSensors(prefix) ~= 50ms
+
+ final PartialPath prefixPath = new
PartialPath(req.getPrefixes().toArray(new String[0]));
+ final Map<TableId, Map<IDeviceID, Map<String, Pair<TSDataType,
TimeValuePair>>>> resultMap =
+ new HashMap<>();
+ int sensorNum = 0;
+
+ final String prefixString = prefixPath.toString();
+ for (final ISchemaRegion region :
SchemaEngine.getInstance().getAllSchemaRegions()) {
+ if (!prefixString.startsWith(region.getDatabaseFullPath())
+ && !region.getDatabaseFullPath().startsWith(prefixString)) {
+ continue;
+ }
+ sensorNum += region.fillLastQueryMap(prefixPath, resultMap);
+ }
+
+ // 2.DATA_NODE_SCHEMA_CACHE.getLastCache()
+ if (!TableDeviceSchemaCache.getInstance().getLastCache(resultMap)) {
+ // 2.1 any sensor miss cache, construct last query sql, then return
+ return executeLastDataQueryInternal(convert(req), SELECT_RESULT);
+ }
+
+ // 2.2 all sensors hit cache, return response ~= 20ms
+ final TsBlockBuilder builder =
LastQueryUtil.createTsBlockBuilder(sensorNum);
+
+ for (final Map.Entry<TableId, Map<IDeviceID, Map<String,
Pair<TSDataType, TimeValuePair>>>>
+ result : resultMap.entrySet()) {
+ for (final Map.Entry<IDeviceID, Map<String, Pair<TSDataType,
TimeValuePair>>>
+ device2MeasurementLastEntry : result.getValue().entrySet()) {
+ final String deviceWithSeparator =
+ device2MeasurementLastEntry.getKey().toString() +
TsFileConstant.PATH_SEPARATOR;
+ for (final Map.Entry<String, Pair<TSDataType, TimeValuePair>>
measurementLastEntry :
+ device2MeasurementLastEntry.getValue().entrySet()) {
+ final TimeValuePair tvPair =
measurementLastEntry.getValue().getRight();
+ if (tvPair != TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR) {
+ LastQueryUtil.appendLastValue(
+ builder,
+ tvPair.getTimestamp(),
+ deviceWithSeparator + measurementLastEntry.getKey(),
+ tvPair.getValue().getStringValue(),
+ measurementLastEntry.getValue().getLeft().name());
+ }
+ }
+ }
+ }
+
+ final TSExecuteStatementResp resp =
+ createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId);
+ resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, ""));
+ if (builder.isEmpty()) {
+ resp.setQueryResult(Collections.emptyList());
+ } else {
+
resp.setQueryResult(Collections.singletonList(serde.serialize(builder.build())));
+ }
+
+ resp.setMoreData(false);
+ return resp;
+ } catch (final Exception e) {
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + req + "\". " +
OperationType.EXECUTE_LAST_DATA_QUERY));
+ }
+ }
+
+ private TSLastDataQueryReq convert(final
TSFastLastDataQueryForOnePrefixPathReq req) {
+ TSLastDataQueryReq tsLastDataQueryReq =
+ new TSLastDataQueryReq(
+ req.sessionId,
+ Collections.singletonList(String.join(".", req.getPrefixes()) +
".**"),
+ Long.MIN_VALUE,
+ req.statementId);
+ tsLastDataQueryReq.setFetchSize(req.fetchSize);
+ tsLastDataQueryReq.setEnableRedirectQuery(req.enableRedirectQuery);
+ tsLastDataQueryReq.setLegalPathNodes(true);
+ tsLastDataQueryReq.setTimeout(req.timeout);
+ return tsLastDataQueryReq;
+ }
+
@Override
public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2(
TSFastLastDataQueryForOneDeviceReq req) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java
index 9552a0f6ea6..1a2d788c9f7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache;
import javax.annotation.concurrent.GuardedBy;
+import java.util.Map;
+import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.function.ToIntFunction;
@@ -37,6 +39,9 @@ public interface IDualKeyCache<FK, SK, V> {
/** Get the cache value with given first key and second key. */
V get(final FK firstKey, final SK secondKey);
+ <R> boolean batchApply(
+ final Map<FK, Map<SK, R>> inputMap, final BiFunction<V, R, Boolean>
mappingFunction);
+
/**
* Update the existing value. The updater shall return the difference caused
by the update,
* because we do not want to call "valueSizeComputer" twice, which may
include abundant useless
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
index f88be9c3366..ba7270ccafc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.function.ToIntFunction;
@@ -76,6 +77,26 @@ class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK,
V>>
}
}
+ public <R> boolean batchApply(
+ final Map<FK, Map<SK, R>> inputMap, final BiFunction<V, R, Boolean>
mappingFunction) {
+ for (final Map.Entry<FK, Map<SK, R>> fkMapEntry : inputMap.entrySet()) {
+ final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(fkMapEntry.getKey());
+ if (cacheEntryGroup == null) {
+ return false;
+ }
+ for (final Map.Entry<SK, R> skrEntry : fkMapEntry.getValue().entrySet())
{
+ final T cacheEntry = cacheEntryGroup.getCacheEntry(skrEntry.getKey());
+ if (cacheEntry == null) {
+ return false;
+ }
+ if (!mappingFunction.apply(cacheEntry.getValue(),
skrEntry.getValue())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
@Override
public void update(
final FK firstKey,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java
index 382ff1bfd5e..1f00d28dc59 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache;
import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
@@ -217,6 +218,18 @@ public class TableDeviceCacheEntry {
return Objects.nonNull(cache) ? cache.getTimeValuePair(measurement) : null;
}
+ boolean updateInputMap(final @Nonnull Map<String, Pair<TSDataType,
TimeValuePair>> updateMap) {
+ // Shall only call this for original table device
+ for (final String measurement : updateMap.keySet()) {
+ final TimeValuePair result = getTimeValuePair(measurement);
+ if (result == null) {
+ return false;
+ }
+ updateMap.get(measurement).setRight(result);
+ }
+ return true;
+ }
+
// Shall pass in "" if last by time
Optional<Pair<OptionalLong, TsPrimitiveType[]>> getLastRow(
final String sourceMeasurement, final List<String> targetMeasurements) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
index 56577448859..b30413baa88 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
@@ -38,6 +38,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectN
import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegion;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.read.TimeValuePair;
@@ -445,6 +446,11 @@ public class TableDeviceSchemaCache {
Objects.isNull(timeValuePairs));
}
+ public boolean getLastCache(
+ final Map<TableId, Map<IDeviceID, Map<String, Pair<TSDataType,
TimeValuePair>>>> inputMap) {
+ return dualKeyCache.batchApply(inputMap,
TableDeviceCacheEntry::updateInputMap);
+ }
+
// WARNING: This is not guaranteed to affect table model's cache
void invalidateLastCache(final PartialPath devicePath, final String
measurement) {
final ToIntFunction<TableDeviceCacheEntry> updateFunction =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableId.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableId.java
index f1af7d4ff44..45ecf7212ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableId.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableId.java
@@ -26,7 +26,7 @@ import javax.annotation.Nullable;
import java.util.Objects;
-class TableId {
+public class TableId {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(TableId.class);
@@ -36,7 +36,7 @@ class TableId {
private final String tableName;
- TableId(final @Nullable String database, final @Nonnull String tableName) {
+ public TableId(final @Nullable String database, final @Nonnull String
tableName) {
this.database = database;
this.tableName = tableName;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
index 840a1630f1b..a9b002bbd6d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableId;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.ConstructTableDevicesBlackListNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.CreateOrUpdateTableDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.DeleteTableDeviceNode;
@@ -55,6 +56,9 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IAlterLogica
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.ICreateLogicalViewPlan;
import org.apache.iotdb.db.schemaengine.template.Template;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.Pair;
import java.io.File;
@@ -337,6 +341,11 @@ public interface ISchemaRegion {
long countPathsUsingTemplate(int templateId, PathPatternTree patternTree)
throws MetadataException;
+ int fillLastQueryMap(
+ final PartialPath pattern,
+ final Map<TableId, Map<IDeviceID, Map<String, Pair<TSDataType,
TimeValuePair>>>> mapToFill)
+ throws MetadataException;
+
// endregion
// region table device management
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index d3df7df33d9..3f49dfec93d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -53,6 +53,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.Cre
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableId;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.ConstructTableDevicesBlackListNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.CreateOrUpdateTableDeviceNode;
@@ -128,8 +129,10 @@ import org.apache.iotdb.db.utils.SchemaUtils;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.common.type.TypeFactory;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
@@ -193,12 +196,12 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
private boolean isRecovering = true;
private volatile boolean initialized = false;
- private final String storageGroupDirPath;
+ private final String databaseDirPath;
private final String schemaRegionDirPath;
// For table model db: without "root."
// For tree model db: with "root."
- private final String storageGroupFullPath;
+ private final String databaseFullPath;
private final SchemaRegionId schemaRegionId;
// the log file writer
@@ -219,11 +222,11 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
public SchemaRegionMemoryImpl(final ISchemaRegionParams schemaRegionParams)
throws MetadataException {
- storageGroupFullPath = schemaRegionParams.getDatabase();
+ databaseFullPath = schemaRegionParams.getDatabase();
this.schemaRegionId = schemaRegionParams.getSchemaRegionId();
- storageGroupDirPath = config.getSchemaDir() + File.separator +
storageGroupFullPath;
- schemaRegionDirPath = storageGroupDirPath + File.separator +
schemaRegionId.getId();
+ databaseDirPath = config.getSchemaDir() + File.separator +
databaseFullPath;
+ schemaRegionDirPath = databaseDirPath + File.separator +
schemaRegionId.getId();
// In ratis mode, no matter create schemaRegion or recover schemaRegion,
the working dir should
// be clear first
@@ -236,7 +239,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
this.regionStatistics =
new MemSchemaRegionStatistics(
schemaRegionId.getId(),
schemaRegionParams.getSchemaEngineStatistics());
- this.metric = new SchemaRegionMemMetric(regionStatistics,
storageGroupFullPath);
+ this.metric = new SchemaRegionMemMetric(regionStatistics,
databaseFullPath);
init();
}
@@ -267,11 +270,11 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
deviceAttributeStore = new DeviceAttributeStore(regionStatistics);
deviceAttributeCacheUpdater =
new DeviceAttributeCacheUpdater(
- regionStatistics,
PathUtils.unQualifyDatabaseName(storageGroupFullPath));
+ regionStatistics,
PathUtils.unQualifyDatabaseName(databaseFullPath));
tagManager = new TagManager(schemaRegionDirPath, regionStatistics);
mTree =
new MTreeBelowSGMemoryImpl(
-
PartialPath.getQualifiedDatabasePartialPath(storageGroupFullPath),
+ PartialPath.getQualifiedDatabasePartialPath(databaseFullPath),
tagManager::readTags,
tagManager::readAttributes,
regionStatistics,
@@ -297,14 +300,14 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
}
private void initDir() throws SchemaDirCreationFailureException {
- final File sgSchemaFolder =
SystemFileFactory.INSTANCE.getFile(storageGroupDirPath);
+ final File sgSchemaFolder =
SystemFileFactory.INSTANCE.getFile(databaseDirPath);
if (!sgSchemaFolder.exists()) {
if (sgSchemaFolder.mkdirs()) {
- logger.info("create database schema folder {}", storageGroupDirPath);
+ logger.info("create database schema folder {}", databaseDirPath);
} else {
if (!sgSchemaFolder.exists()) {
- logger.error("create database schema folder {} failed.",
storageGroupDirPath);
- throw new SchemaDirCreationFailureException(storageGroupDirPath);
+ logger.error("create database schema folder {} failed.",
databaseDirPath);
+ throw new SchemaDirCreationFailureException(databaseDirPath);
}
}
}
@@ -394,11 +397,11 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
logger.debug(
"spend {} ms to deserialize {} mtree from mlog.bin",
System.currentTimeMillis() - time,
- storageGroupFullPath);
+ databaseFullPath);
return idx;
} catch (final Exception e) {
e.printStackTrace();
- throw new IOException("Failed to parse " + storageGroupFullPath + "
mlog.bin for err:" + e);
+ throw new IOException("Failed to parse " + databaseFullPath + "
mlog.bin for err:" + e);
}
} else {
return 0;
@@ -465,7 +468,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
@Override
public String getDatabaseFullPath() {
- return storageGroupFullPath;
+ return databaseFullPath;
}
@Override
@@ -570,7 +573,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
snapshotStartTime = System.currentTimeMillis();
deviceAttributeCacheUpdater =
- new DeviceAttributeCacheUpdater(regionStatistics,
storageGroupFullPath);
+ new DeviceAttributeCacheUpdater(regionStatistics, databaseFullPath);
deviceAttributeCacheUpdater.loadFromSnapshot(latestSnapshotRootDir);
logger.info(
"Device attribute remote updater snapshot loading of schemaRegion {}
costs {}ms.",
@@ -589,7 +592,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
mTree =
MTreeBelowSGMemoryImpl.loadFromSnapshot(
latestSnapshotRootDir,
- storageGroupFullPath,
+ databaseFullPath,
regionStatistics,
metric,
measurementMNode -> {
@@ -606,7 +609,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
} catch (final IOException e) {
logger.error(
"Failed to recover tagIndex for {} in schemaRegion {}.",
- storageGroupFullPath + PATH_SEPARATOR +
measurementMNode.getFullPath(),
+ databaseFullPath + PATH_SEPARATOR +
measurementMNode.getFullPath(),
schemaRegionId);
}
},
@@ -888,10 +891,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
throws SchemaQuotaExceededException {
final int notExistNum = mTree.getTableDeviceNotExistNum(tableName,
deviceIdList);
schemaQuotaManager.check(
- (long)
- DataNodeTableCache.getInstance()
- .getTable(storageGroupFullPath, tableName)
- .getFieldNum()
+ (long) DataNodeTableCache.getInstance().getTable(databaseFullPath,
tableName).getFieldNum()
* notExistNum,
notExistNum);
}
@@ -1408,11 +1408,19 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
return result;
}
+ @Override
+ public int fillLastQueryMap(
+ final PartialPath pattern,
+ final Map<TableId, Map<IDeviceID, Map<String, Pair<TSDataType,
TimeValuePair>>>> mapToFill)
+ throws MetadataException {
+ return mTree.fillLastQueryMap(pattern, mapToFill);
+ }
+
@Override
public void createOrUpdateTableDevice(final CreateOrUpdateTableDeviceNode
node)
throws MetadataException {
for (int i = 0; i < node.getDeviceIdList().size(); i++) {
- final String databaseName = storageGroupFullPath;
+ final String databaseName = databaseFullPath;
final String tableName = node.getTableName();
final String[] deviceId =
Arrays.stream(node.getDeviceIdList().get(i))
@@ -1603,14 +1611,14 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
throws MetadataException {
final List<PartialPath> paths =
DeleteDevice.constructPaths(
- storageGroupFullPath,
+ databaseFullPath,
constructTableDevicesBlackListNode.getTableName(),
constructTableDevicesBlackListNode.getPatternInfo());
final DeviceBlackListConstructor constructor =
DeleteDevice.constructDevicePredicateUpdater(
- storageGroupFullPath,
+ databaseFullPath,
DataNodeTableCache.getInstance()
- .getTable(storageGroupFullPath,
constructTableDevicesBlackListNode.getTableName()),
+ .getTable(databaseFullPath,
constructTableDevicesBlackListNode.getTableName()),
constructTableDevicesBlackListNode.getFilterInfo(),
(pointer, name) -> deviceAttributeStore.getAttributes(pointer,
name),
regionStatistics);
@@ -1631,7 +1639,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
throws MetadataException {
final List<PartialPath> paths =
DeleteDevice.constructPaths(
- PathUtils.unQualifyDatabaseName(storageGroupFullPath),
+ PathUtils.unQualifyDatabaseName(databaseFullPath),
rollbackTableDevicesBlackListNode.getTableName(),
rollbackTableDevicesBlackListNode.getPatternInfo());
for (final PartialPath pattern : paths) {
@@ -1646,7 +1654,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
throws MetadataException {
final List<PartialPath> paths =
DeleteDevice.constructPaths(
- PathUtils.unQualifyDatabaseName(storageGroupFullPath),
+ PathUtils.unQualifyDatabaseName(databaseFullPath),
rollbackTableDevicesBlackListNode.getTableName(),
rollbackTableDevicesBlackListNode.getPatternInfo());
for (final PartialPath pattern : paths) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index 85038d21908..6b8000ecc59 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -39,6 +39,7 @@ import
org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import
org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableId;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.ConstructTableDevicesBlackListNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.CreateOrUpdateTableDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.DeleteTableDeviceNode;
@@ -101,8 +102,10 @@ import
org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1480,6 +1483,13 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
throw new UnsupportedOperationException("TableModel does not support
PBTree yet.");
}
+ @Override
+ public int fillLastQueryMap(
+ PartialPath pattern,
+ Map<TableId, Map<IDeviceID, Map<String, Pair<TSDataType,
TimeValuePair>>>> mapToFill) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
@Override
public ISchemaReader<IDeviceSchemaInfo> getDeviceReader(IShowDevicesPlan
showDevicesPlan)
throws MetadataException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
index beeface4474..605dd3b7a28 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
@@ -46,6 +46,7 @@ import
org.apache.iotdb.db.exception.quota.ExceedQuotaException;
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
import
org.apache.iotdb.db.queryengine.execution.operator.schema.source.DeviceAttributeUpdater;
import
org.apache.iotdb.db.queryengine.execution.operator.schema.source.DeviceBlackListConstructor;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableId;
import org.apache.iotdb.db.schemaengine.metric.SchemaRegionMemMetric;
import org.apache.iotdb.db.schemaengine.rescon.MemSchemaRegionStatistics;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.IMemMNode;
@@ -77,8 +78,10 @@ import org.apache.iotdb.rpc.TSStatusCode;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -1137,6 +1140,37 @@ public class MTreeBelowSGMemoryImpl {
}
}
+ public int fillLastQueryMap(
+ final PartialPath prefixPath,
+ final Map<TableId, Map<IDeviceID, Map<String, Pair<TSDataType,
TimeValuePair>>>> mapToFill)
+ throws MetadataException {
+ final int[] sensorNum = {0};
+ try (final EntityUpdater<IMemMNode> updater =
+ new EntityUpdater<IMemMNode>(
+ rootNode, prefixPath, store, true, SchemaConstant.ALL_MATCH_SCOPE)
{
+
+ @Override
+ protected void updateEntity(final IDeviceMNode<IMemMNode> node) {
+ final Map<String, Pair<TSDataType, TimeValuePair>> measurementMap
= new HashMap<>();
+ for (final IMemMNode child : node.getChildren().values()) {
+ if (child instanceof IMeasurementMNode) {
+ measurementMap.put(
+ child.getName(),
+ new Pair<>(((IMeasurementMNode<?>) child).getDataType(),
null));
+ }
+ }
+ final IDeviceID deviceID = node.getPartialPath().getIDeviceID();
+ mapToFill
+ .computeIfAbsent(new TableId(null, deviceID.getTableName()), o
-> new HashMap<>())
+ .put(deviceID, measurementMap);
+ sensorNum[0] += measurementMap.size();
+ }
+ }) {
+ updater.update();
+ }
+ return sensorNum[0];
+ }
+
// Used for device query/fetch with filters during show device or table query
public ISchemaReader<IDeviceSchemaInfo> getTableDeviceReader(
final PartialPath pattern, final BiFunction<Integer, String, Binary>
attributeProvider)
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index f658017b600..aaa319f3500 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -347,6 +347,16 @@ struct TSLastDataQueryReq {
9: optional bool legalPathNodes
}
+struct TSFastLastDataQueryForOnePrefixPathReq {
+ 1: required i64 sessionId
+ 2: required list<string> prefixes
+ 3: optional i32 fetchSize
+ 4: required i64 statementId
+ 5: optional bool enableRedirectQuery
+ 6: optional bool jdbcQuery
+ 7: optional i64 timeout
+}
+
struct TSFastLastDataQueryForOneDeviceReq {
1: required i64 sessionId
2: required string db
@@ -549,6 +559,8 @@ service IClientRPCService {
TSExecuteStatementResp executeLastDataQueryV2(1:TSLastDataQueryReq req);
+ TSExecuteStatementResp
executeFastLastDataQueryForOnePrefixPath(1:TSFastLastDataQueryForOnePrefixPathReq
req);
+
TSExecuteStatementResp
executeFastLastDataQueryForOneDeviceV2(1:TSFastLastDataQueryForOneDeviceReq
req);
TSExecuteStatementResp executeAggregationQueryV2(1:TSAggregationQueryReq
req);