This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2778868aa1e4c5862db4b46760db892f64d4d4be Author: Caideyipi <[email protected]> AuthorDate: Tue Dec 2 12:29:04 2025 +0800 Fixed the schema cache calculation 2 & The potential NPE caused by concurrent invalidate and update (#16834) * fix * test * sl * fix * fix * fix * sonar * fix * fix * test (cherry picked from commit cb705d17b85b93ca8808c7eb786574a2729c6a0f) --- .../dualkeycache/impl/CacheEntryGroupImpl.java | 6 +++ .../schema/dualkeycache/impl/DualKeyCacheImpl.java | 61 +++++++++------------- .../schema/dualkeycache/impl/ICacheEntryGroup.java | 3 ++ .../fetcher/cache/TableDeviceCacheEntry.java | 8 +-- .../fetcher/cache/TableDeviceLastCache.java | 56 +++++++++++++------- .../fetcher/cache/TableDeviceSchemaCache.java | 6 +-- .../fetcher/cache/TreeDeviceNormalSchema.java | 4 +- .../cache/TreeDeviceSchemaCacheManagerTest.java | 14 +++++ .../fetcher/cache/TableDeviceSchemaCacheTest.java | 9 ++++ 9 files changed, 101 insertions(+), 66 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java index 54c53a15d50..ea7f502a90b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java @@ -72,6 +72,12 @@ public class CacheEntryGroupImpl<FK, SK, V, T extends ICacheEntry<SK, V>> return cacheEntryMap.compute(secondKey, computation.apply(memory)); } + @Override + public T computeCacheEntryIfPresent( + final SK secondKey, final Function<AtomicLong, BiFunction<SK, T, T>> computation) { + return cacheEntryMap.computeIfPresent(secondKey, computation.apply(memory)); + } + @Override public long removeCacheEntry(final SK secondKey) { final T result = cacheEntryMap.remove(secondKey); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java index 86d10422249..1778920a23a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java @@ -144,7 +144,28 @@ class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK, V>> @Override public void update( final FK firstKey, final Predicate<SK> secondKeyChecker, final ToIntFunction<V> updater) { - final ICacheEntryGroup<FK, SK, V, T> entryGroup = firstKeyMap.get(firstKey); + clearSecondEntry(firstKeyMap.get(firstKey), secondKeyChecker, updater); + mayEvict(); + } + + @Override + public void update( + final Predicate<FK> firstKeyChecker, + final Predicate<SK> secondKeyChecker, + final ToIntFunction<V> updater) { + for (final FK firstKey : firstKeyMap.getAllKeys()) { + if (!firstKeyChecker.test(firstKey)) { + continue; + } + clearSecondEntry(firstKeyMap.get(firstKey), secondKeyChecker, updater); + } + mayEvict(); + } + + public void clearSecondEntry( + final ICacheEntryGroup<FK, SK, V, T> entryGroup, + final Predicate<SK> secondKeyChecker, + final ToIntFunction<V> updater) { if (Objects.nonNull(entryGroup)) { entryGroup .getAllCacheEntries() @@ -153,49 +174,15 @@ class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK, V>> if (!secondKeyChecker.test(entry.getKey())) { return; } - entryGroup.computeCacheEntry( + entryGroup.computeCacheEntryIfPresent( entry.getKey(), memory -> (secondKey, cacheEntry) -> { - if (Objects.nonNull(cacheEntry)) { - memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue())); - } + memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue())); return cacheEntry; }); }); } - mayEvict(); - } - - @Override - public void update( - final Predicate<FK> firstKeyChecker, - final Predicate<SK> secondKeyChecker, - final ToIntFunction<V> updater) { - for (final FK firstKey : firstKeyMap.getAllKeys()) { - if (!firstKeyChecker.test(firstKey)) { - continue; - } - final ICacheEntryGroup<FK, SK, V, T> entryGroup = firstKeyMap.get(firstKey); - if (Objects.nonNull(entryGroup)) { - entryGroup - .getAllCacheEntries() - .forEachRemaining( - entry -> { - if (!secondKeyChecker.test(entry.getKey())) { - return; - } - entryGroup.computeCacheEntry( - entry.getKey(), - memory -> - (secondKey, cacheEntry) -> { - memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue())); - return cacheEntry; - }); - }); - } - mayEvict(); - } } private void mayEvict() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java index d1f8ab9923e..0791fadc325 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java @@ -45,6 +45,9 @@ interface ICacheEntryGroup<FK, SK, V, T extends ICacheEntry<SK, V>> { T computeCacheEntry( final SK secondKey, final Function<AtomicLong, BiFunction<SK, T, T>> computation); + T computeCacheEntryIfPresent( + final SK secondKey, final Function<AtomicLong, BiFunction<SK, T, T>> computation); + long removeCacheEntry(final SK secondKey); boolean isEmpty(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java index 1f00d28dc59..7af9c25af6d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java @@ -184,13 +184,13 @@ public class TableDeviceCacheEntry { final boolean isInvalidate, final boolean isTableModel) { int result = - lastCache.compareAndSet(null, new TableDeviceLastCache()) + lastCache.compareAndSet(null, new TableDeviceLastCache(isTableModel)) ? TableDeviceLastCache.INSTANCE_SIZE : 0; final TableDeviceLastCache cache = lastCache.get(); result += Objects.nonNull(cache) - ? cache.initOrInvalidate(database, tableName, measurements, isInvalidate, isTableModel) + ? cache.initOrInvalidate(database, tableName, measurements, isInvalidate) : 0; return Objects.nonNull(lastCache.get()) ? result : 0; } @@ -207,9 +207,9 @@ public class TableDeviceCacheEntry { return tryUpdateLastCache(measurements, timeValuePairs, false); } - int invalidateLastCache(final String measurement, final boolean isTableModel) { + int invalidateLastCache(final String measurement) { final TableDeviceLastCache cache = lastCache.get(); - final int result = Objects.nonNull(cache) ? cache.invalidate(measurement, isTableModel) : 0; + final int result = Objects.nonNull(cache) ? cache.invalidate(measurement) : 0; return Objects.nonNull(lastCache.get()) ? result : 0; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java index f814139e880..5a2ac3d1e53 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java @@ -93,13 +93,17 @@ public class TableDeviceLastCache { // Time is seen as "" as a measurement private final Map<String, TimeValuePair> measurement2CachedLastMap = new ConcurrentHashMap<>(); + private final boolean isTableModel; + + TableDeviceLastCache(final boolean isTableModel) { + this.isTableModel = isTableModel; + } int initOrInvalidate( final String database, final String tableName, final String[] measurements, - final boolean isInvalidate, - final boolean isTableModel) { + final boolean isInvalidate) { final AtomicInteger diff = new AtomicInteger(0); for (final String measurement : measurements) { @@ -121,13 +125,13 @@ public class TableDeviceLastCache { if (Objects.isNull(newPair)) { diff.addAndGet( -((isTableModel ? 0 : (int) RamUsageEstimator.sizeOf(finalMeasurement)) - + getTVPairEntrySize(tvPair))); + + getTvPairEntrySize(tvPair))); return null; } if (Objects.isNull(tvPair)) { diff.addAndGet( (isTableModel ? 0 : (int) RamUsageEstimator.sizeOf(finalMeasurement)) - + getTVPairEntrySize(newPair)); + + getTvPairEntrySize(newPair)); return newPair; } return tvPair; @@ -151,7 +155,9 @@ public class TableDeviceLastCache { for (int i = 0; i < measurements.length; ++i) { if (Objects.isNull(timeValuePairs[i])) { if (invalidateNull) { - measurement2CachedLastMap.remove(measurements[i]); + diff.addAndGet( + -((int) RamUsageEstimator.sizeOf(measurements[i]) + + getTvPairEntrySize(measurement2CachedLastMap.remove(measurements[i])))); } continue; } @@ -181,7 +187,7 @@ public class TableDeviceLastCache { } @GuardedBy("DataRegionInsertLock#writeLock") - int invalidate(final String measurement, final boolean isTableModel) { + int invalidate(final String measurement) { final AtomicInteger diff = new AtomicInteger(); final AtomicLong time = new AtomicLong(); measurement2CachedLastMap.computeIfPresent( @@ -189,7 +195,7 @@ public class TableDeviceLastCache { (s, timeValuePair) -> { diff.set( (isTableModel ? 0 : (int) RamUsageEstimator.sizeOf(s)) - + getTVPairEntrySize(timeValuePair)); + + getTvPairEntrySize(timeValuePair)); time.set(timeValuePair.getTimestamp()); return null; }); @@ -200,7 +206,7 @@ public class TableDeviceLastCache { "", (s, timeValuePair) -> { if (timeValuePair.getTimestamp() <= time.get()) { - diff.addAndGet(getTVPairEntrySize(timeValuePair)); + diff.addAndGet((int) RamUsageEstimator.sizeOf(s) + getTvPairEntrySize(timeValuePair)); return null; } return timeValuePair; @@ -209,13 +215,18 @@ public class TableDeviceLastCache { return diff.get(); } - private int getTVPairEntrySize(final TimeValuePair tvPair) { - return (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY - + ((Objects.isNull(tvPair) - || tvPair == PLACEHOLDER_TIME_VALUE_PAIR - || tvPair == EMPTY_TIME_VALUE_PAIR) - ? 0 - : tvPair.getSize()); + private static int getTvPairEntrySize(final TimeValuePair tvPair) { + return (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY + getTvPairSize(tvPair); + } + + private static int getTvPairSize(final TimeValuePair tvPair) { + return isEmptyTvPair(tvPair) ? 0 : tvPair.getSize(); + } + + private static boolean isEmptyTvPair(final TimeValuePair tvPair) { + return Objects.isNull(tvPair) + || tvPair == PLACEHOLDER_TIME_VALUE_PAIR + || tvPair == EMPTY_TIME_VALUE_PAIR; } @Nullable @@ -262,16 +273,21 @@ public class TableDeviceLastCache { int estimateSize() { return INSTANCE_SIZE + (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY * measurement2CachedLastMap.size() - + measurement2CachedLastMap.values().stream() - .mapToInt(TimeValuePair::getSize) + + measurement2CachedLastMap.entrySet().stream() + .mapToInt( + entry -> + (isTableModel ? 0 : (int) RamUsageEstimator.sizeOf(entry.getKey())) + + TableDeviceLastCache.getTvPairSize(entry.getValue())) .reduce(0, Integer::sum); } private static int getDiffSize( final TimeValuePair oldTimeValuePair, final TimeValuePair newTimeValuePair) { - if (oldTimeValuePair == EMPTY_TIME_VALUE_PAIR - || oldTimeValuePair == PLACEHOLDER_TIME_VALUE_PAIR) { - return newTimeValuePair.getSize(); + if (isEmptyTvPair(oldTimeValuePair)) { + return getTvPairSize(newTimeValuePair); + } + if (isEmptyTvPair(newTimeValuePair)) { + return -getTvPairSize(oldTimeValuePair); } final TsPrimitiveType oldValue = oldTimeValuePair.getValue(); final TsPrimitiveType newValue = newTimeValuePair.getValue(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java index acb9c5e6180..ebbb137a2d8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java @@ -456,7 +456,7 @@ public class TableDeviceSchemaCache { final ToIntFunction<TableDeviceCacheEntry> updateFunction = PathPatternUtil.hasWildcard(measurement) ? entry -> -entry.invalidateLastCache() - : entry -> -entry.invalidateLastCache(measurement, false); + : entry -> -entry.invalidateLastCache(measurement); if (!devicePath.hasWildcard()) { final IDeviceID deviceID = devicePath.getIDeviceID(); @@ -543,7 +543,7 @@ public class TableDeviceSchemaCache { return dualKeyCache.stats().requestCount(); } - long getMemoryUsage() { + public long getMemoryUsage() { return dualKeyCache.stats().memoryUsage(); } @@ -679,7 +679,7 @@ public class TableDeviceSchemaCache { final ToIntFunction<TableDeviceCacheEntry> updateFunction = isAttributeColumn ? entry -> -entry.invalidateAttributeColumn(columnName) - : entry -> -entry.invalidateLastCache(columnName, true); + : entry -> -entry.invalidateLastCache(columnName); dualKeyCache.update(new TableId(null, tableName), deviceID -> true, updateFunction); } finally { readWriteLock.writeLock().unlock(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceNormalSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceNormalSchema.java index 4e9c2fe9fde..da37202c470 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceNormalSchema.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceNormalSchema.java @@ -99,13 +99,13 @@ public class TreeDeviceNormalSchema implements IDeviceSchema { public int estimateSize() { // Do not need to calculate database because it is interned return INSTANCE_SIZE + + measurementMap.size() * (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY + measurementMap.entrySet().stream() .mapToInt( entry -> Math.toIntExact( RamUsageEstimator.sizeOf(entry.getKey()) - + SchemaCacheEntry.estimateSize(entry.getValue()) - + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY)) + + SchemaCacheEntry.estimateSize(entry.getValue()))) .reduce(0, Integer::sum); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java index b166e3d3772..e8d7644e1fb 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.SchemaCacheEntry; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager; import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager; import org.apache.iotdb.db.schemaengine.template.Template; @@ -264,6 +265,11 @@ public class TreeDeviceSchemaCacheManagerTest { Assert.assertEquals( new TimeValuePair(2, new TsPrimitiveType.TsInt(2)), treeDeviceSchemaCacheManager.getLastCache(new MeasurementPath("root.db.d.s3"))); + + Assert.assertTrue(TableDeviceSchemaCache.getInstance().getMemoryUsage() > 0); + + treeDeviceSchemaCacheManager.cleanUp(); + Assert.assertEquals(0, TableDeviceSchemaCache.getInstance().getMemoryUsage()); } @Test @@ -317,5 +323,13 @@ public class TreeDeviceSchemaCacheManagerTest { Assert.assertEquals(1, measurementPaths.size()); Assert.assertEquals(TSDataType.FLOAT, measurementPaths.get(0).getMeasurementSchema().getType()); Assert.assertEquals("root.sg1.d3.s1", measurementPaths.get(0).getFullPath()); + + treeDeviceSchemaCacheManager.invalidateLastCache(new MeasurementPath("root.sg1.**")); + treeDeviceSchemaCacheManager.invalidateDatabaseLastCache("root.sg1"); + TableDeviceSchemaCache.getInstance().invalidateTreeSchema(); + Assert.assertTrue(TableDeviceSchemaCache.getInstance().getMemoryUsage() > 0); + + TableDeviceSchemaCache.getInstance().invalidateAll(); + Assert.assertEquals(0, TableDeviceSchemaCache.getInstance().getMemoryUsage()); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java index 88aed090981..cde0ffbb13f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java @@ -248,6 +248,11 @@ public class TableDeviceSchemaCacheTest { Assert.assertNull( cache.getDeviceAttribute( database1, convertTagValuesToDeviceID(table2, new String[] {"hebei", "p_1", "d_1"}))); + + Assert.assertTrue(cache.getMemoryUsage() > 0); + + cache.invalidateAll(); + Assert.assertEquals(0, cache.getMemoryUsage()); } @Test @@ -502,6 +507,10 @@ public class TableDeviceSchemaCacheTest { TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE, new TsPrimitiveType.TsInt(3), }, result.get().getRight()); + + cache.invalidateLastCache(); + cache.invalidateLastCache(); + Assert.assertTrue(cache.getMemoryUsage() > 0); } @Test
