This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 205e373a013 Refactored the schema cache (#16446)
205e373a013 is described below

commit 205e373a013f77192b858f4f5ef311c13e8d5178
Author: Caideyipi <[email protected]>
AuthorDate: Mon Sep 22 10:03:40 2025 +0800

    Refactored the schema cache (#16446)
    
    * partial
    
    * Update pom.xml
---
 .../common/schematree/DeviceSchemaInfo.java        |  12 +-
 .../schema/dualkeycache/impl/DualKeyCacheImpl.java |   2 +
 .../plan/analyze/schema/ClusterSchemaFetcher.java  |   4 +-
 .../plan/planner/OperatorTreeGenerator.java        |  10 +-
 .../fetcher/cache/TableDeviceLastCache.java        |   4 +
 .../cache/TreeDeviceSchemaCacheManager.java        | 155 ++++++++++++---------
 .../cache/TreeDeviceSchemaCacheManagerTest.java    |  19 ++-
 7 files changed, 112 insertions(+), 94 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/DeviceSchemaInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/DeviceSchemaInfo.java
index 2c52b6b6770..ead892f3e62 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/DeviceSchemaInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/DeviceSchemaInfo.java
@@ -30,16 +30,12 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE;
-
 public class DeviceSchemaInfo {
 
-  private PartialPath devicePath;
-  private boolean isAligned;
-  private List<IMeasurementSchemaInfo> measurementSchemaInfoList;
-  private int templateId = NON_TEMPLATE;
-
-  private DeviceSchemaInfo() {}
+  private final PartialPath devicePath;
+  private final boolean isAligned;
+  private final List<IMeasurementSchemaInfo> measurementSchemaInfoList;
+  private final int templateId;
 
   public DeviceSchemaInfo(
       PartialPath devicePath,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
index 841444f4279..5e614fa5fd3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
@@ -224,6 +224,8 @@ class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK, 
V>>
     final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
         firstKeyMap.get(belongedGroup.getFirstKey());
     if (Objects.nonNull(cacheEntryGroup) && cacheEntryGroup.isEmpty()) {
+      // The removal is non-atomic, but it's ok because it's just a cache and 
does not affect the
+      // consistency if you evicts some entries being added
       if (Objects.nonNull(firstKeyMap.remove(belongedGroup.getFirstKey()))) {
         memory +=
             sizeComputer.computeFirstKeySize(belongedGroup.getFirstKey())
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
index 5dca594eaaf..49e2995b3a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -125,7 +125,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher 
{
       Set<String> storageGroupSet = new HashSet<>();
       if (!explicitDevicePatternList.isEmpty()) {
         for (PartialPath explicitDevicePattern : explicitDevicePatternList) {
-          cachedSchema = 
schemaCache.getMatchedSchemaWithTemplate(explicitDevicePattern);
+          cachedSchema = 
schemaCache.getMatchedTemplateSchema(explicitDevicePattern);
           if (cachedSchema.isEmpty()) {
             isAllCached = false;
             break;
@@ -144,7 +144,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher 
{
             continue;
           }
           cachedSchema =
-              schemaCache.getMatchedSchemaWithoutTemplate(new 
MeasurementPath(fullPath.getNodes()));
+              schemaCache.getMatchedNormalSchema(new 
MeasurementPath(fullPath.getNodes()));
           if (cachedSchema.isEmpty()) {
             isAllCached = false;
             break;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 2bc2c419412..0c36104055f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -2800,12 +2800,11 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     final boolean isNeedUpdateLastCache = context.isNeedUpdateLastCache();
     if (isNeedUpdateLastCache) {
       TreeDeviceSchemaCacheManager.getInstance()
-          .updateLastCache(
+          .declareLastCache(
               ((DataDriverContext) operatorContext.getDriverContext())
                   .getDataRegion()
                   .getDatabaseName(),
-              fullPath,
-              false);
+              fullPath);
     }
 
     return Objects.isNull(node.getOutputViewPath())
@@ -2857,13 +2856,12 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
 
       for (int i = 0; i < size; ++i) {
         TreeDeviceSchemaCacheManager.getInstance()
-            .updateLastCache(
+            .declareLastCache(
                 databaseName,
                 new MeasurementPath(
                     
devicePath.concatNode(unCachedPath.getMeasurementList().get(i)),
                     unCachedPath.getSchemaList().get(i),
-                    true),
-                false);
+                    true));
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java
index 16f5e208f9d..62dbfdccdeb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java
@@ -82,8 +82,12 @@ public class TableDeviceLastCache {
 
   private static final Optional<Pair<OptionalLong, TsPrimitiveType[]>> 
HIT_AND_ALL_NULL =
       Optional.of(new Pair<>(OptionalLong.empty(), null));
+
+  /** This means that the tv pair has been put, and the value is null */
   public static final TimeValuePair EMPTY_TIME_VALUE_PAIR =
       new TimeValuePair(Long.MIN_VALUE, EMPTY_PRIMITIVE_TYPE);
+
+  /** This means that the tv pair has been declared, and is ready for the next 
put. */
   private static final TimeValuePair PLACEHOLDER_TIME_VALUE_PAIR =
       new TimeValuePair(Long.MIN_VALUE, EMPTY_PRIMITIVE_TYPE);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java
index f38fbefa378..0a38501c0f2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java
@@ -128,14 +128,14 @@ public class TreeDeviceSchemaCacheManager {
    * @param devicePath full path of the device
    * @return empty if cache miss or the device path is not a template 
activated path
    */
-  public ClusterSchemaTree getMatchedSchemaWithTemplate(final PartialPath 
devicePath) {
+  public ClusterSchemaTree getMatchedTemplateSchema(final PartialPath 
devicePath) {
     final ClusterSchemaTree tree = new ClusterSchemaTree();
     final IDeviceSchema schema = 
tableDeviceSchemaCache.getDeviceSchema(devicePath.getNodes());
     if (!(schema instanceof TreeDeviceTemplateSchema)) {
       return tree;
     }
     final TreeDeviceTemplateSchema treeSchema = (TreeDeviceTemplateSchema) 
schema;
-    Template template = 
templateManager.getTemplate(treeSchema.getTemplateId());
+    final Template template = 
templateManager.getTemplate(treeSchema.getTemplateId());
     tree.appendTemplateDevice(devicePath, template.isDirectAligned(), 
template.getId(), template);
     tree.setDatabases(Collections.singleton(treeSchema.getDatabase()));
     return tree;
@@ -147,7 +147,7 @@ public class TreeDeviceSchemaCacheManager {
    * @param fullPath full path
    * @return empty if cache miss
    */
-  public ClusterSchemaTree getMatchedSchemaWithoutTemplate(final PartialPath 
fullPath) {
+  public ClusterSchemaTree getMatchedNormalSchema(final PartialPath fullPath) {
     final ClusterSchemaTree tree = new ClusterSchemaTree();
     final IDeviceSchema schema =
         tableDeviceSchemaCache.getDeviceSchema(
@@ -283,60 +283,7 @@ public class TreeDeviceSchemaCacheManager {
         continue;
       }
       final IMeasurementSchema schema = templateSchema.get(measurements[i]);
-      computation.computeMeasurement(
-          i,
-          new IMeasurementSchemaInfo() {
-            @Override
-            public String getName() {
-              return schema.getMeasurementName();
-            }
-
-            @Override
-            public IMeasurementSchema getSchema() {
-              if (isLogicalView()) {
-                return new LogicalViewSchema(
-                    schema.getMeasurementName(), ((LogicalViewSchema) 
schema).getExpression());
-              } else {
-                return this.getSchemaAsMeasurementSchema();
-              }
-            }
-
-            @Override
-            public MeasurementSchema getSchemaAsMeasurementSchema() {
-              return new MeasurementSchema(
-                  schema.getMeasurementName(),
-                  schema.getType(),
-                  schema.getEncodingType(),
-                  schema.getCompressor());
-            }
-
-            @Override
-            public LogicalViewSchema getSchemaAsLogicalViewSchema() {
-              throw new RuntimeException(
-                  new UnsupportedOperationException(
-                      "Function getSchemaAsLogicalViewSchema is not supported 
in DeviceUsingTemplateSchemaCache."));
-            }
-
-            @Override
-            public Map<String, String> getTagMap() {
-              return null;
-            }
-
-            @Override
-            public Map<String, String> getAttributeMap() {
-              return null;
-            }
-
-            @Override
-            public String getAlias() {
-              return null;
-            }
-
-            @Override
-            public boolean isLogicalView() {
-              return schema.isLogicalView();
-            }
-          });
+      computation.computeMeasurement(i, new WrappedSchemaInfo(schema));
     }
     return indexOfMissingMeasurements;
   }
@@ -399,10 +346,10 @@ public class TreeDeviceSchemaCacheManager {
    *
    * <p>Note: The query shall put the {@link TableDeviceLastCache} twice:
    *
-   * <p>- First time set the "isCommit" to {@code false} before the query 
accesses data. It is just
-   * to allow the writing to update the cache, then avoid that the query put a 
stale value to cache
-   * and break the consistency. WARNING: The writing may temporarily put a 
stale value in cache if a
-   * stale value is written, but it won't affect the eventual consistency.
+   * <p>- First time call this before the query accesses data. It is just to 
allow the writing to
+   * update the cache, then avoid that the query put a stale value to cache 
and break the
+   * consistency. WARNING: The writing may temporarily put a stale value in 
cache if a stale value
+   * is written, but it won't affect the eventual consistency.
    *
    * <p>- Second time put the calculated {@link TimeValuePair}, and use {@link
    * #updateLastCacheIfExists(String, IDeviceID, String[], TimeValuePair[], 
boolean,
@@ -410,22 +357,35 @@ public class TreeDeviceSchemaCacheManager {
    * if the measurement is with all {@code null}s, its {@link TimeValuePair} 
shall be {@link
    * TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR}. This method is not supposed 
to update time column.
    *
+   * @param database the device's database, WITH "root"
+   * @param measurementPath the fetched {@link MeasurementPath}
+   */
+  public void declareLastCache(final String database, final MeasurementPath 
measurementPath) {
+    tableDeviceSchemaCache.updateLastCache(
+        database,
+        measurementPath.getIDeviceID(),
+        new String[] {measurementPath.getMeasurement()},
+        null,
+        measurementPath.isUnderAlignedEntity(),
+        new IMeasurementSchema[] {measurementPath.getMeasurementSchema()},
+        true);
+  }
+
+  /**
+   * Update the {@link TableDeviceLastCache} on query in tree model.
+   *
    * <p>If the query has ended abnormally, it shall call this to invalidate 
the entry it has pushed
-   * in the first time, to avoid the stale writing damaging the eventual 
consistency. In this case
-   * and the "isInvalidate" shall be {@code true}.
+   * in the first time, to avoid the stale writing damaging the eventual 
consistency.
    *
    * @param database the device's database, WITH "root"
    * @param measurementPath the fetched {@link MeasurementPath}
-   * @param isInvalidate {@code true} if invalidate the first pushed cache, or 
{@code null} for the
-   *     first fetch.
    */
-  public void updateLastCache(
-      final String database, final MeasurementPath measurementPath, final 
boolean isInvalidate) {
+  public void invalidateLastCache(final String database, final MeasurementPath 
measurementPath) {
     tableDeviceSchemaCache.updateLastCache(
         database,
         measurementPath.getIDeviceID(),
         new String[] {measurementPath.getMeasurement()},
-        isInvalidate ? new TimeValuePair[] {null} : null,
+        new TimeValuePair[] {null},
         measurementPath.isUnderAlignedEntity(),
         new IMeasurementSchema[] {measurementPath.getMeasurementSchema()},
         true);
@@ -446,4 +406,63 @@ public class TreeDeviceSchemaCacheManager {
   public void cleanUp() {
     tableDeviceSchemaCache.invalidateAll();
   }
+
+  private static class WrappedSchemaInfo implements IMeasurementSchemaInfo {
+    private final IMeasurementSchema schema;
+
+    public WrappedSchemaInfo(final IMeasurementSchema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public String getName() {
+      return schema.getMeasurementName();
+    }
+
+    @Override
+    public IMeasurementSchema getSchema() {
+      if (isLogicalView()) {
+        return new LogicalViewSchema(
+            schema.getMeasurementName(), ((LogicalViewSchema) 
schema).getExpression());
+      } else {
+        return this.getSchemaAsMeasurementSchema();
+      }
+    }
+
+    @Override
+    public MeasurementSchema getSchemaAsMeasurementSchema() {
+      return new MeasurementSchema(
+          schema.getMeasurementName(),
+          schema.getType(),
+          schema.getEncodingType(),
+          schema.getCompressor());
+    }
+
+    @Override
+    public LogicalViewSchema getSchemaAsLogicalViewSchema() {
+      throw new RuntimeException(
+          new UnsupportedOperationException(
+              "Function getSchemaAsLogicalViewSchema is not supported in 
DeviceUsingTemplateSchemaCache."));
+    }
+
+    @Override
+    public Map<String, String> getTagMap() {
+      return null;
+    }
+
+    @Override
+    public Map<String, String> getAttributeMap() {
+      return null;
+    }
+
+    @Override
+    public String getAlias() {
+      return null;
+    }
+
+    @Override
+    public boolean isLogicalView() {
+      return schema.isLogicalView();
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java
index 446d0cd6587..b166e3d3772 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java
@@ -197,10 +197,10 @@ public class TreeDeviceSchemaCacheManagerTest {
 
     final TimeValuePair tv1 = new TimeValuePair(1, new 
TsPrimitiveType.TsInt(1));
 
-    treeDeviceSchemaCacheManager.updateLastCache(
-        database, new MeasurementPath(device.concatNode("s1"), s1), false);
-    treeDeviceSchemaCacheManager.updateLastCache(
-        database, new MeasurementPath(device.concatNode("s3"), s3), false);
+    treeDeviceSchemaCacheManager.declareLastCache(
+        database, new MeasurementPath(device.concatNode("s1"), s1));
+    treeDeviceSchemaCacheManager.declareLastCache(
+        database, new MeasurementPath(device.concatNode("s3"), s3));
 
     // Simulate "s1" revert when the query has failed in calculation
     treeDeviceSchemaCacheManager.updateLastCacheIfExists(
@@ -213,8 +213,8 @@ public class TreeDeviceSchemaCacheManagerTest {
         },
         false,
         new MeasurementSchema[] {s1});
-    treeDeviceSchemaCacheManager.updateLastCache(
-        database, new MeasurementPath(device.concatNode("s1"), s1), true);
+    treeDeviceSchemaCacheManager.invalidateLastCache(
+        database, new MeasurementPath(device.concatNode("s1"), s1));
 
     // "s2" shall be null since the "null" timeValuePair has not been put
     treeDeviceSchemaCacheManager.updateLastCacheIfExists(
@@ -294,12 +294,11 @@ public class TreeDeviceSchemaCacheManagerTest {
         new MeasurementPath("root.sg1.d3.s1", TSDataType.FLOAT));
     treeDeviceSchemaCacheManager.put(clusterSchemaTree);
     final ClusterSchemaTree d1Tree =
-        treeDeviceSchemaCacheManager.getMatchedSchemaWithTemplate(new 
PartialPath("root.sg1.d1"));
+        treeDeviceSchemaCacheManager.getMatchedTemplateSchema(new 
PartialPath("root.sg1.d1"));
     final ClusterSchemaTree d2Tree =
-        treeDeviceSchemaCacheManager.getMatchedSchemaWithTemplate(new 
PartialPath("root.sg1.d2"));
+        treeDeviceSchemaCacheManager.getMatchedTemplateSchema(new 
PartialPath("root.sg1.d2"));
     final ClusterSchemaTree d3Tree =
-        treeDeviceSchemaCacheManager.getMatchedSchemaWithoutTemplate(
-            new MeasurementPath("root.sg1.d3.s1"));
+        treeDeviceSchemaCacheManager.getMatchedNormalSchema(new 
MeasurementPath("root.sg1.d3.s1"));
     List<MeasurementPath> measurementPaths = 
d1Tree.searchMeasurementPaths(ALL_MATCH_PATTERN).left;
     Assert.assertEquals(2, measurementPaths.size());
     for (final MeasurementPath measurementPath : measurementPaths) {

Reply via email to