This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new dd6f958ee98 Optimize FetchSchema requests (#16434)
dd6f958ee98 is described below
commit dd6f958ee98df8737d03c701531f4b7d4b6f3fe7
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Sep 19 11:22:31 2025 +0800
Optimize FetchSchema requests (#16434)
* add cache
* Optimize FetchSchema requests and Cache attribute strings
* update
* update wait time
* update DeviceSchemaRequestCache
* update TableDeviceSchemaFetcher
* update TableDeviceSchemaFetcher
* update DeviceSchemaRequestCache
* update DeviceSchemaRequestCache
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 28 +++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 12 +++
.../metadata/cache/DeviceSchemaRequestCache.java | 88 ++++++++++++++++++++++
.../metadata/fetcher/TableDeviceSchemaFetcher.java | 13 ++++
4 files changed, 141 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 58958ad4415..64b7c39aee1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -193,6 +193,12 @@ public class IoTDBConfig {
/** Minimum ratio of effective information in wal files */
private volatile double walMinEffectiveInfoRatio = 0.1;
+ /** Maximum number of pending device schema requests */
+ private volatile int deviceSchemaRequestCacheMaxSize = 500;
+
+ /** Wait time in milliseconds for device schema request cache */
+ private volatile int deviceSchemaRequestCacheWaitTimeMs = 20;
+
private volatile long dataNodeTableSchemaCacheSize = 1 << 20;
/**
@@ -1937,6 +1943,28 @@ public class IoTDBConfig {
this.dataNodeTableSchemaCacheSize = dataNodeTableSchemaCacheSize;
}
+ public int getDeviceSchemaRequestCacheMaxSize() {
+ return deviceSchemaRequestCacheMaxSize;
+ }
+
+ public void setDeviceSchemaRequestCacheMaxSize(int
deviceSchemaRequestCacheMaxSize) {
+ if (deviceSchemaRequestCacheMaxSize < 0) {
+ return;
+ }
+ this.deviceSchemaRequestCacheMaxSize = deviceSchemaRequestCacheMaxSize;
+ }
+
+ public int getDeviceSchemaRequestCacheWaitTimeMs() {
+ return deviceSchemaRequestCacheWaitTimeMs;
+ }
+
+ public void setDeviceSchemaRequestCacheWaitTimeMs(int
deviceSchemaRequestCacheWaitTimeMs) {
+ if (deviceSchemaRequestCacheWaitTimeMs < 0) {
+ return;
+ }
+ this.deviceSchemaRequestCacheWaitTimeMs =
deviceSchemaRequestCacheWaitTimeMs;
+ }
+
public long getWalMemTableSnapshotThreshold() {
return walMemTableSnapshotThreshold;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 1f793f70aa2..af3d7410659 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -988,6 +988,18 @@ public class IoTDBDescriptor {
"data_node_table_schema_cache_max_size_in_bytes",
String.valueOf(conf.getDataNodeTableSchemaCacheSize()))));
+ conf.setDeviceSchemaRequestCacheMaxSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "device_schema_request_cache_max_size",
+ String.valueOf(conf.getDeviceSchemaRequestCacheMaxSize()))));
+
+ conf.setDeviceSchemaRequestCacheWaitTimeMs(
+ Integer.parseInt(
+ properties.getProperty(
+ "device_schema_request_cache_wait_time_ms",
+
String.valueOf(conf.getDeviceSchemaRequestCacheWaitTimeMs()))));
+
// Commons
commonDescriptor.loadCommonProps(properties);
commonDescriptor.initCommonConfigDir(conf.getSystemDir());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/cache/DeviceSchemaRequestCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/cache/DeviceSchemaRequestCache.java
new file mode 100644
index 00000000000..121d3b515b7
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/cache/DeviceSchemaRequestCache.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.metadata.cache;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FetchDevice;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Binary;
+
+import java.util.Map;
+
+public class DeviceSchemaRequestCache {
+ private static final DeviceSchemaRequestCache INSTANCE = new
DeviceSchemaRequestCache();
+
+ private final Cache<FetchDevice, FetchMissingDeviceSchema> pendingRequests =
+ Caffeine.newBuilder()
+ .maximumSize(
+
IoTDBDescriptor.getInstance().getConfig().getDeviceSchemaRequestCacheMaxSize())
+ .build();
+
+ private DeviceSchemaRequestCache() {}
+
+ public static DeviceSchemaRequestCache getInstance() {
+ return INSTANCE;
+ }
+
+ public FetchMissingDeviceSchema getOrCreatePendingRequest(FetchDevice
statement) {
+ return pendingRequests.get(statement, k -> new FetchMissingDeviceSchema());
+ }
+
+ public void removeCompletedRequest(FetchDevice statement) {
+ pendingRequests.invalidate(statement);
+ }
+
+ public static class FetchMissingDeviceSchema {
+ private volatile Map<IDeviceID, Map<String, Binary>> result;
+ private volatile boolean done = false;
+ private volatile long queryId = -1;
+
+ public Map<IDeviceID, Map<String, Binary>> getResult() {
+ return result;
+ }
+
+ public void setResult(Map<IDeviceID, Map<String, Binary>> result) {
+ this.result = result;
+ }
+
+ public synchronized void waitForQuery(long queryId) {
+ if (this.queryId != -1) {
+ if (!done) {
+ try {
+ this.wait(
+
IoTDBDescriptor.getInstance().getConfig().getDeviceSchemaRequestCacheWaitTimeMs());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } else {
+ this.queryId = queryId;
+ }
+ }
+
+ public synchronized void notifyFetchDone() {
+ done = true;
+ this.notifyAll();
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
index 643f5364faa..9b0bea05bce 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
@@ -38,6 +38,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.schema
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.NonAlignedDeviceEntry;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.cache.DeviceSchemaRequestCache;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.IDeviceSchema;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceNormalSchema;
@@ -80,6 +81,8 @@ public class TableDeviceSchemaFetcher {
private final TableDeviceSchemaCache cache =
TableDeviceSchemaCache.getInstance();
+ private final DeviceSchemaRequestCache requestCache =
DeviceSchemaRequestCache.getInstance();
+
private final TableDeviceCacheAttributeGuard attributeGuard =
new TableDeviceCacheAttributeGuard();
@@ -101,7 +104,14 @@ public class TableDeviceSchemaFetcher {
Map<IDeviceID, Map<String, Binary>> fetchMissingDeviceSchemaForDataInsertion(
final FetchDevice statement, final MPPQueryContext context) {
+ DeviceSchemaRequestCache.FetchMissingDeviceSchema schema =
+ requestCache.getOrCreatePendingRequest(statement);
+
final long queryId = SessionManager.getInstance().requestQueryId();
+ schema.waitForQuery(queryId);
+ if (schema.getResult() != null) {
+ return schema.getResult();
+ }
Throwable t = null;
final String database = statement.getDatabase();
@@ -165,6 +175,7 @@ public class TableDeviceSchemaFetcher {
}
}
+ schema.setResult(fetchedDeviceSchema);
fetchedDeviceSchema.forEach((key, value) ->
cache.putAttributes(database, key, value));
return fetchedDeviceSchema;
@@ -172,6 +183,8 @@ public class TableDeviceSchemaFetcher {
t = throwable;
throw throwable;
} finally {
+ schema.notifyFetchDone();
+ requestCache.removeCompletedRequest(statement);
queryIdSet.remove(queryId);
attributeGuard.tryUpdateCache();
coordinator.cleanupQueryExecution(queryId, null, t);