This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 6a2178f87d2 [To dev/1.3] Fixed the schema cache calculation 2  & The 
potential NPE caused by concurrent invalidate and update (#16834) & The schema 
cache is not cleared for "clear schema cache"  (#16833)
6a2178f87d2 is described below

commit 6a2178f87d257834af467cac648161d5e7e2018d
Author: Caideyipi <[email protected]>
AuthorDate: Tue Dec 2 12:20:19 2025 +0800

    [To dev/1.3] Fixed the schema cache calculation 2  & The potential NPE 
caused by concurrent invalidate and update (#16834) & The schema cache is not 
cleared for "clear schema cache"  (#16833)
    
    * fix
    
    * fix
    
    * ffff
    
    * 13-fix
    
    * fix
    
    * fix
    
    * refactor
---
 .../impl/DataNodeInternalRPCServiceImpl.java       |  7 ++-
 .../analyze/cache/schema/DeviceCacheEntry.java     |  6 ++-
 .../plan/analyze/cache/schema/DeviceLastCache.java | 46 ++++++++++------
 .../analyze/cache/schema/DeviceNormalSchema.java   |  4 +-
 .../analyze/cache/schema/DeviceSchemaCache.java    |  4 +-
 .../dualkeycache/impl/CacheEntryGroupImpl.java     |  6 +++
 .../schema/dualkeycache/impl/DualKeyCacheImpl.java | 61 +++++++++-------------
 .../schema/dualkeycache/impl/ICacheEntryGroup.java |  3 ++
 .../db/queryengine/plan/parser/ASTVisitor.java     |  4 +-
 .../db/metadata/cache/DataNodeSchemaCacheTest.java | 13 +++++
 .../commons/schema/cache/CacheClearOptions.java    |  2 +-
 11 files changed, 89 insertions(+), 67 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 9d837b6f8fb..7d25b5c0d35 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1974,16 +1974,15 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
           || options.contains(CacheClearOptions.QUERY)) {
         storageEngine.clearCache();
       }
-      if (options.contains(CacheClearOptions.QUERY)
-          && options.contains(CacheClearOptions.TREE_SCHEMA)) {
+      if (options.contains(CacheClearOptions.QUERY) && 
options.contains(CacheClearOptions.SCHEMA)) {
         schemaCache.getDeviceSchemaCache().invalidateAll();
         return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
       }
       if (options.contains(CacheClearOptions.QUERY)) {
         schemaCache.getDeviceSchemaCache().invalidateLastCache();
       }
-      if (options.contains(CacheClearOptions.TREE_SCHEMA)) {
-        schemaCache.getDeviceSchemaCache().invalidateTreeSchema();
+      if (options.contains(CacheClearOptions.SCHEMA)) {
+        schemaCache.getDeviceSchemaCache().invalidateSchema();
       }
     } catch (final Exception e) {
       return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java
index a12b9c7661b..3337e2f1420 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java
@@ -97,8 +97,10 @@ public class DeviceCacheEntry {
     final AtomicInteger size = new AtomicInteger(0);
     deviceSchema.updateAndGet(
         schema -> {
-          size.set(schema.estimateSize());
-          return schema;
+          if (Objects.nonNull(schema)) {
+            size.set(schema.estimateSize());
+          }
+          return null;
         });
     return size.get();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceLastCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceLastCache.java
index 36fc39c3286..9ebc1fc413c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceLastCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceLastCache.java
@@ -99,12 +99,12 @@ public class DeviceLastCache {
           (measurementKey, tvPair) -> {
             if (Objects.isNull(newPair)) {
               diff.addAndGet(
-                  -((int) RamUsageEstimator.sizeOf(measurement) + 
getTVPairEntrySize(tvPair)));
+                  -((int) RamUsageEstimator.sizeOf(measurement) + 
getTvPairEntrySize(tvPair)));
               return null;
             }
             if (Objects.isNull(tvPair)) {
               diff.addAndGet(
-                  (int) RamUsageEstimator.sizeOf(measurement) + 
getTVPairEntrySize(newPair));
+                  (int) RamUsageEstimator.sizeOf(measurement) + 
getTvPairEntrySize(newPair));
               return newPair;
             }
             return tvPair;
@@ -128,7 +128,9 @@ public class DeviceLastCache {
     for (int i = 0; i < measurements.length; ++i) {
       if (Objects.isNull(timeValuePairs[i])) {
         if (invalidateNull) {
-          measurement2CachedLastMap.remove(measurements[i]);
+          diff.addAndGet(
+              -((int) RamUsageEstimator.sizeOf(measurements[i])
+                  + 
getTvPairEntrySize(measurement2CachedLastMap.remove(measurements[i]))));
         }
         continue;
       }
@@ -164,7 +166,7 @@ public class DeviceLastCache {
     measurement2CachedLastMap.computeIfPresent(
         measurement,
         (s, timeValuePair) -> {
-          diff.set((int) RamUsageEstimator.sizeOf(s) + 
getTVPairEntrySize(timeValuePair));
+          diff.set((int) RamUsageEstimator.sizeOf(s) + 
getTvPairEntrySize(timeValuePair));
           time.set(timeValuePair.getTimestamp());
           return null;
         });
@@ -175,7 +177,7 @@ public class DeviceLastCache {
         "",
         (s, timeValuePair) -> {
           if (timeValuePair.getTimestamp() <= time.get()) {
-            diff.addAndGet(getTVPairEntrySize(timeValuePair));
+            diff.addAndGet((int) RamUsageEstimator.sizeOf(s) + 
getTvPairEntrySize(timeValuePair));
             return null;
           }
           return timeValuePair;
@@ -184,13 +186,18 @@ public class DeviceLastCache {
     return diff.get();
   }
 
-  private int getTVPairEntrySize(final TimeValuePair tvPair) {
-    return (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
-        + ((Objects.isNull(tvPair)
-                || tvPair == PLACEHOLDER_TIME_VALUE_PAIR
-                || tvPair == EMPTY_TIME_VALUE_PAIR)
-            ? 0
-            : tvPair.getSize());
+  private static int getTvPairEntrySize(final TimeValuePair tvPair) {
+    return (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY + 
getTvPairSize(tvPair);
+  }
+
+  private static int getTvPairSize(final TimeValuePair tvPair) {
+    return isEmptyTvPair(tvPair) ? 0 : tvPair.getSize();
+  }
+
+  private static boolean isEmptyTvPair(final TimeValuePair tvPair) {
+    return Objects.isNull(tvPair)
+        || tvPair == PLACEHOLDER_TIME_VALUE_PAIR
+        || tvPair == EMPTY_TIME_VALUE_PAIR;
   }
 
   @Nullable
@@ -202,16 +209,21 @@ public class DeviceLastCache {
   int estimateSize() {
     return INSTANCE_SIZE
         + (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY * 
measurement2CachedLastMap.size()
-        + measurement2CachedLastMap.values().stream()
-            .mapToInt(TimeValuePair::getSize)
+        + measurement2CachedLastMap.entrySet().stream()
+            .mapToInt(
+                entry ->
+                    (int) RamUsageEstimator.sizeOf(entry.getKey())
+                        + DeviceLastCache.getTvPairSize(entry.getValue()))
             .reduce(0, Integer::sum);
   }
 
   private static int getDiffSize(
       final TimeValuePair oldTimeValuePair, final TimeValuePair 
newTimeValuePair) {
-    if (oldTimeValuePair == EMPTY_TIME_VALUE_PAIR
-        || oldTimeValuePair == PLACEHOLDER_TIME_VALUE_PAIR) {
-      return newTimeValuePair.getSize();
+    if (isEmptyTvPair(oldTimeValuePair)) {
+      return getTvPairSize(newTimeValuePair);
+    }
+    if (isEmptyTvPair(newTimeValuePair)) {
+      return -getTvPairSize(oldTimeValuePair);
     }
     final TsPrimitiveType oldValue = oldTimeValuePair.getValue();
     final TsPrimitiveType newValue = newTimeValuePair.getValue();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java
index 5ce2ca06fcd..6d50905bbca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java
@@ -99,13 +99,13 @@ public class DeviceNormalSchema implements IDeviceSchema {
   public int estimateSize() {
     // Do not need to calculate database because it is interned
     return INSTANCE_SIZE
+        + measurementMap.size() * (int) 
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
         + measurementMap.entrySet().stream()
             .mapToInt(
                 entry ->
                     Math.toIntExact(
                         RamUsageEstimator.sizeOf(entry.getKey())
-                            + SchemaCacheEntry.estimateSize(entry.getValue())
-                            + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY))
+                            + SchemaCacheEntry.estimateSize(entry.getValue())))
             .reduce(0, Integer::sum);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java
index 8d66cb3d619..f13e2030cf9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java
@@ -229,7 +229,7 @@ public class DeviceSchemaCache {
     return dualKeyCache.stats().requestCount();
   }
 
-  long getMemoryUsage() {
+  public long getMemoryUsage() {
     return dualKeyCache.stats().memoryUsage();
   }
 
@@ -284,7 +284,7 @@ public class DeviceSchemaCache {
     }
   }
 
-  public void invalidateTreeSchema() {
+  public void invalidateSchema() {
     lock.lock();
     try {
       dualKeyCache.update(segment -> true, device -> true, entry -> 
-entry.invalidateSchema());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
index 54c53a15d50..ea7f502a90b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
@@ -72,6 +72,12 @@ public class CacheEntryGroupImpl<FK, SK, V, T extends 
ICacheEntry<SK, V>>
     return cacheEntryMap.compute(secondKey, computation.apply(memory));
   }
 
+  @Override
+  public T computeCacheEntryIfPresent(
+      final SK secondKey, final Function<AtomicLong, BiFunction<SK, T, T>> 
computation) {
+    return cacheEntryMap.computeIfPresent(secondKey, 
computation.apply(memory));
+  }
+
   @Override
   public long removeCacheEntry(final SK secondKey) {
     final T result = cacheEntryMap.remove(secondKey);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
index 86d10422249..1778920a23a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
@@ -144,7 +144,28 @@ class DualKeyCacheImpl<FK, SK, V, T extends 
ICacheEntry<SK, V>>
   @Override
   public void update(
       final FK firstKey, final Predicate<SK> secondKeyChecker, final 
ToIntFunction<V> updater) {
-    final ICacheEntryGroup<FK, SK, V, T> entryGroup = 
firstKeyMap.get(firstKey);
+    clearSecondEntry(firstKeyMap.get(firstKey), secondKeyChecker, updater);
+    mayEvict();
+  }
+
+  @Override
+  public void update(
+      final Predicate<FK> firstKeyChecker,
+      final Predicate<SK> secondKeyChecker,
+      final ToIntFunction<V> updater) {
+    for (final FK firstKey : firstKeyMap.getAllKeys()) {
+      if (!firstKeyChecker.test(firstKey)) {
+        continue;
+      }
+      clearSecondEntry(firstKeyMap.get(firstKey), secondKeyChecker, updater);
+    }
+    mayEvict();
+  }
+
+  public void clearSecondEntry(
+      final ICacheEntryGroup<FK, SK, V, T> entryGroup,
+      final Predicate<SK> secondKeyChecker,
+      final ToIntFunction<V> updater) {
     if (Objects.nonNull(entryGroup)) {
       entryGroup
           .getAllCacheEntries()
@@ -153,49 +174,15 @@ class DualKeyCacheImpl<FK, SK, V, T extends 
ICacheEntry<SK, V>>
                 if (!secondKeyChecker.test(entry.getKey())) {
                   return;
                 }
-                entryGroup.computeCacheEntry(
+                entryGroup.computeCacheEntryIfPresent(
                     entry.getKey(),
                     memory ->
                         (secondKey, cacheEntry) -> {
-                          if (Objects.nonNull(cacheEntry)) {
-                            
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
-                          }
+                          
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
                           return cacheEntry;
                         });
               });
     }
-    mayEvict();
-  }
-
-  @Override
-  public void update(
-      final Predicate<FK> firstKeyChecker,
-      final Predicate<SK> secondKeyChecker,
-      final ToIntFunction<V> updater) {
-    for (final FK firstKey : firstKeyMap.getAllKeys()) {
-      if (!firstKeyChecker.test(firstKey)) {
-        continue;
-      }
-      final ICacheEntryGroup<FK, SK, V, T> entryGroup = 
firstKeyMap.get(firstKey);
-      if (Objects.nonNull(entryGroup)) {
-        entryGroup
-            .getAllCacheEntries()
-            .forEachRemaining(
-                entry -> {
-                  if (!secondKeyChecker.test(entry.getKey())) {
-                    return;
-                  }
-                  entryGroup.computeCacheEntry(
-                      entry.getKey(),
-                      memory ->
-                          (secondKey, cacheEntry) -> {
-                            
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
-                            return cacheEntry;
-                          });
-                });
-      }
-      mayEvict();
-    }
   }
 
   private void mayEvict() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
index d1f8ab9923e..0791fadc325 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
@@ -45,6 +45,9 @@ interface ICacheEntryGroup<FK, SK, V, T extends 
ICacheEntry<SK, V>> {
   T computeCacheEntry(
       final SK secondKey, final Function<AtomicLong, BiFunction<SK, T, T>> 
computation);
 
+  T computeCacheEntryIfPresent(
+      final SK secondKey, final Function<AtomicLong, BiFunction<SK, T, T>> 
computation);
+
   long removeCacheEntry(final SK secondKey);
 
   boolean isEmpty();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 62e7e803b5e..0f60d2f9ba6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -3356,12 +3356,12 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
     clearCacheStatement.setOnCluster(ctx.LOCAL() == null);
 
     if (ctx.SCHEMA() != null) {
-      
clearCacheStatement.setOptions(Collections.singleton(CacheClearOptions.TREE_SCHEMA));
+      
clearCacheStatement.setOptions(Collections.singleton(CacheClearOptions.SCHEMA));
     } else if (ctx.QUERY() != null) {
       
clearCacheStatement.setOptions(Collections.singleton(CacheClearOptions.QUERY));
     } else if (ctx.ALL() != null) {
       clearCacheStatement.setOptions(
-          new HashSet<>(Arrays.asList(CacheClearOptions.TREE_SCHEMA, 
CacheClearOptions.QUERY)));
+          new HashSet<>(Arrays.asList(CacheClearOptions.SCHEMA, 
CacheClearOptions.QUERY)));
     } else {
       
clearCacheStatement.setOptions(Collections.singleton(CacheClearOptions.DEFAULT));
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
index d2f11d65d1f..144f2396ef7 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
@@ -260,6 +260,11 @@ public class DataNodeSchemaCacheTest {
     Assert.assertEquals(
         new TimeValuePair(2, new TsPrimitiveType.TsInt(2)),
         dataNodeSchemaCache.getLastCache(new MeasurementPath("root.db.d.s3")));
+
+    
Assert.assertTrue(dataNodeSchemaCache.getDeviceSchemaCache().getMemoryUsage() > 
0);
+
+    dataNodeSchemaCache.cleanUp();
+    Assert.assertEquals(0, 
dataNodeSchemaCache.getDeviceSchemaCache().getMemoryUsage());
   }
 
   @Test
@@ -313,5 +318,13 @@ public class DataNodeSchemaCacheTest {
     Assert.assertEquals(1, measurementPaths.size());
     Assert.assertEquals(TSDataType.FLOAT, 
measurementPaths.get(0).getMeasurementSchema().getType());
     Assert.assertEquals("root.sg1.d3.s1", 
measurementPaths.get(0).getFullPath());
+
+    dataNodeSchemaCache.getDeviceSchemaCache().invalidateSchema();
+    dataNodeSchemaCache.getDeviceSchemaCache().invalidateSchema();
+    dataNodeSchemaCache.getDeviceSchemaCache().invalidateSchema();
+    
Assert.assertTrue(dataNodeSchemaCache.getDeviceSchemaCache().getMemoryUsage() > 
0);
+
+    dataNodeSchemaCache.getDeviceSchemaCache().invalidateAll();
+    Assert.assertEquals(0, 
dataNodeSchemaCache.getDeviceSchemaCache().getMemoryUsage());
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/cache/CacheClearOptions.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/cache/CacheClearOptions.java
index 3ee5ba02f77..076f859eae7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/cache/CacheClearOptions.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/cache/CacheClearOptions.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.commons.schema.cache;
 
 public enum CacheClearOptions {
-  TREE_SCHEMA,
+  SCHEMA,
   QUERY,
   DEFAULT,
 }

Reply via email to