This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new dd6f958ee98 Optimize FetchSchema requests (#16434)
dd6f958ee98 is described below

commit dd6f958ee98df8737d03c701531f4b7d4b6f3fe7
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Sep 19 11:22:31 2025 +0800

    Optimize FetchSchema requests (#16434)
    
    * add cache
    
    * Optimize FetchSchema requests and Cache attribute strings
    
    * update
    
    * update wait time
    
    * update DeviceSchemaRequestCache
    
    * update TableDeviceSchemaFetcher
    
    * update TableDeviceSchemaFetcher
    
    * update DeviceSchemaRequestCache
    
    * update DeviceSchemaRequestCache
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 28 +++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 12 +++
 .../metadata/cache/DeviceSchemaRequestCache.java   | 88 ++++++++++++++++++++++
 .../metadata/fetcher/TableDeviceSchemaFetcher.java | 13 ++++
 4 files changed, 141 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 58958ad4415..64b7c39aee1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -193,6 +193,12 @@ public class IoTDBConfig {
   /** Minimum ratio of effective information in wal files */
   private volatile double walMinEffectiveInfoRatio = 0.1;
 
+  /** Maximum number of pending device schema requests */
+  private volatile int deviceSchemaRequestCacheMaxSize = 500;
+
+  /** Wait time in milliseconds for device schema request cache */
+  private volatile int deviceSchemaRequestCacheWaitTimeMs = 20;
+
   private volatile long dataNodeTableSchemaCacheSize = 1 << 20;
 
   /**
@@ -1937,6 +1943,28 @@ public class IoTDBConfig {
     this.dataNodeTableSchemaCacheSize = dataNodeTableSchemaCacheSize;
   }
 
+  public int getDeviceSchemaRequestCacheMaxSize() {
+    return deviceSchemaRequestCacheMaxSize;
+  }
+
+  public void setDeviceSchemaRequestCacheMaxSize(int 
deviceSchemaRequestCacheMaxSize) {
+    if (deviceSchemaRequestCacheMaxSize < 0) {
+      return;
+    }
+    this.deviceSchemaRequestCacheMaxSize = deviceSchemaRequestCacheMaxSize;
+  }
+
+  public int getDeviceSchemaRequestCacheWaitTimeMs() {
+    return deviceSchemaRequestCacheWaitTimeMs;
+  }
+
+  public void setDeviceSchemaRequestCacheWaitTimeMs(int 
deviceSchemaRequestCacheWaitTimeMs) {
+    if (deviceSchemaRequestCacheWaitTimeMs < 0) {
+      return;
+    }
+    this.deviceSchemaRequestCacheWaitTimeMs = 
deviceSchemaRequestCacheWaitTimeMs;
+  }
+
   public long getWalMemTableSnapshotThreshold() {
     return walMemTableSnapshotThreshold;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 1f793f70aa2..af3d7410659 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -988,6 +988,18 @@ public class IoTDBDescriptor {
                 "data_node_table_schema_cache_max_size_in_bytes",
                 String.valueOf(conf.getDataNodeTableSchemaCacheSize()))));
 
+    conf.setDeviceSchemaRequestCacheMaxSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "device_schema_request_cache_max_size",
+                String.valueOf(conf.getDeviceSchemaRequestCacheMaxSize()))));
+
+    conf.setDeviceSchemaRequestCacheWaitTimeMs(
+        Integer.parseInt(
+            properties.getProperty(
+                "device_schema_request_cache_wait_time_ms",
+                
String.valueOf(conf.getDeviceSchemaRequestCacheWaitTimeMs()))));
+
     // Commons
     commonDescriptor.loadCommonProps(properties);
     commonDescriptor.initCommonConfigDir(conf.getSystemDir());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/cache/DeviceSchemaRequestCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/cache/DeviceSchemaRequestCache.java
new file mode 100644
index 00000000000..121d3b515b7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/cache/DeviceSchemaRequestCache.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.metadata.cache;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FetchDevice;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Binary;
+
+import java.util.Map;
+
+public class DeviceSchemaRequestCache {
+  private static final DeviceSchemaRequestCache INSTANCE = new 
DeviceSchemaRequestCache();
+
+  private final Cache<FetchDevice, FetchMissingDeviceSchema> pendingRequests =
+      Caffeine.newBuilder()
+          .maximumSize(
+              
IoTDBDescriptor.getInstance().getConfig().getDeviceSchemaRequestCacheMaxSize())
+          .build();
+
+  private DeviceSchemaRequestCache() {}
+
+  public static DeviceSchemaRequestCache getInstance() {
+    return INSTANCE;
+  }
+
+  public FetchMissingDeviceSchema getOrCreatePendingRequest(FetchDevice 
statement) {
+    return pendingRequests.get(statement, k -> new FetchMissingDeviceSchema());
+  }
+
+  public void removeCompletedRequest(FetchDevice statement) {
+    pendingRequests.invalidate(statement);
+  }
+
+  public static class FetchMissingDeviceSchema {
+    private volatile Map<IDeviceID, Map<String, Binary>> result;
+    private volatile boolean done = false;
+    private volatile long queryId = -1;
+
+    public Map<IDeviceID, Map<String, Binary>> getResult() {
+      return result;
+    }
+
+    public void setResult(Map<IDeviceID, Map<String, Binary>> result) {
+      this.result = result;
+    }
+
+    public synchronized void waitForQuery(long queryId) {
+      if (this.queryId != -1) {
+        if (!done) {
+          try {
+            this.wait(
+                
IoTDBDescriptor.getInstance().getConfig().getDeviceSchemaRequestCacheWaitTimeMs());
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      } else {
+        this.queryId = queryId;
+      }
+    }
+
+    public synchronized void notifyFetchDone() {
+      done = true;
+      this.notifyAll();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
index 643f5364faa..9b0bea05bce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.schema
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.NonAlignedDeviceEntry;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.cache.DeviceSchemaRequestCache;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.IDeviceSchema;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceNormalSchema;
@@ -80,6 +81,8 @@ public class TableDeviceSchemaFetcher {
 
   private final TableDeviceSchemaCache cache = 
TableDeviceSchemaCache.getInstance();
 
+  private final DeviceSchemaRequestCache requestCache = 
DeviceSchemaRequestCache.getInstance();
+
   private final TableDeviceCacheAttributeGuard attributeGuard =
       new TableDeviceCacheAttributeGuard();
 
@@ -101,7 +104,14 @@ public class TableDeviceSchemaFetcher {
 
   Map<IDeviceID, Map<String, Binary>> fetchMissingDeviceSchemaForDataInsertion(
       final FetchDevice statement, final MPPQueryContext context) {
+    DeviceSchemaRequestCache.FetchMissingDeviceSchema schema =
+        requestCache.getOrCreatePendingRequest(statement);
+
     final long queryId = SessionManager.getInstance().requestQueryId();
+    schema.waitForQuery(queryId);
+    if (schema.getResult() != null) {
+      return schema.getResult();
+    }
     Throwable t = null;
 
     final String database = statement.getDatabase();
@@ -165,6 +175,7 @@ public class TableDeviceSchemaFetcher {
         }
       }
 
+      schema.setResult(fetchedDeviceSchema);
       fetchedDeviceSchema.forEach((key, value) -> 
cache.putAttributes(database, key, value));
 
       return fetchedDeviceSchema;
@@ -172,6 +183,8 @@ public class TableDeviceSchemaFetcher {
       t = throwable;
       throw throwable;
     } finally {
+      schema.notifyFetchDone();
+      requestCache.removeCompletedRequest(statement);
       queryIdSet.remove(queryId);
       attributeGuard.tryUpdateCache();
       coordinator.cleanupQueryExecution(queryId, null, t);

Reply via email to