This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cb705d17b85 Fixed the schema cache calculation 2 & The potential NPE
caused by concurrent invalidate and update (#16834)
cb705d17b85 is described below
commit cb705d17b85b93ca8808c7eb786574a2729c6a0f
Author: Caideyipi <[email protected]>
AuthorDate: Tue Dec 2 12:29:04 2025 +0800
Fixed the schema cache calculation 2 & The potential NPE caused by
concurrent invalidate and update (#16834)
* fix
* test
* sl
* fix
* fix
* fix
* sonar
* fix
* fix
* test
---
.../dualkeycache/impl/CacheEntryGroupImpl.java | 6 +++
.../schema/dualkeycache/impl/DualKeyCacheImpl.java | 61 +++++++++-------------
.../schema/dualkeycache/impl/ICacheEntryGroup.java | 3 ++
.../fetcher/cache/TableDeviceCacheEntry.java | 8 +--
.../fetcher/cache/TableDeviceLastCache.java | 56 +++++++++++++-------
.../fetcher/cache/TableDeviceSchemaCache.java | 6 +--
.../fetcher/cache/TreeDeviceNormalSchema.java | 4 +-
.../cache/TreeDeviceSchemaCacheManagerTest.java | 14 +++++
.../fetcher/cache/TableDeviceSchemaCacheTest.java | 9 ++++
9 files changed, 101 insertions(+), 66 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
index 54c53a15d50..ea7f502a90b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
@@ -72,6 +72,12 @@ public class CacheEntryGroupImpl<FK, SK, V, T extends
ICacheEntry<SK, V>>
return cacheEntryMap.compute(secondKey, computation.apply(memory));
}
+ @Override
+ public T computeCacheEntryIfPresent(
+ final SK secondKey, final Function<AtomicLong, BiFunction<SK, T, T>>
computation) {
+ return cacheEntryMap.computeIfPresent(secondKey,
computation.apply(memory));
+ }
+
@Override
public long removeCacheEntry(final SK secondKey) {
final T result = cacheEntryMap.remove(secondKey);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
index 86d10422249..1778920a23a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
@@ -144,7 +144,28 @@ class DualKeyCacheImpl<FK, SK, V, T extends
ICacheEntry<SK, V>>
@Override
public void update(
final FK firstKey, final Predicate<SK> secondKeyChecker, final
ToIntFunction<V> updater) {
- final ICacheEntryGroup<FK, SK, V, T> entryGroup =
firstKeyMap.get(firstKey);
+ clearSecondEntry(firstKeyMap.get(firstKey), secondKeyChecker, updater);
+ mayEvict();
+ }
+
+ @Override
+ public void update(
+ final Predicate<FK> firstKeyChecker,
+ final Predicate<SK> secondKeyChecker,
+ final ToIntFunction<V> updater) {
+ for (final FK firstKey : firstKeyMap.getAllKeys()) {
+ if (!firstKeyChecker.test(firstKey)) {
+ continue;
+ }
+ clearSecondEntry(firstKeyMap.get(firstKey), secondKeyChecker, updater);
+ }
+ mayEvict();
+ }
+
+ public void clearSecondEntry(
+ final ICacheEntryGroup<FK, SK, V, T> entryGroup,
+ final Predicate<SK> secondKeyChecker,
+ final ToIntFunction<V> updater) {
if (Objects.nonNull(entryGroup)) {
entryGroup
.getAllCacheEntries()
@@ -153,49 +174,15 @@ class DualKeyCacheImpl<FK, SK, V, T extends
ICacheEntry<SK, V>>
if (!secondKeyChecker.test(entry.getKey())) {
return;
}
- entryGroup.computeCacheEntry(
+ entryGroup.computeCacheEntryIfPresent(
entry.getKey(),
memory ->
(secondKey, cacheEntry) -> {
- if (Objects.nonNull(cacheEntry)) {
-
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
- }
+
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
return cacheEntry;
});
});
}
- mayEvict();
- }
-
- @Override
- public void update(
- final Predicate<FK> firstKeyChecker,
- final Predicate<SK> secondKeyChecker,
- final ToIntFunction<V> updater) {
- for (final FK firstKey : firstKeyMap.getAllKeys()) {
- if (!firstKeyChecker.test(firstKey)) {
- continue;
- }
- final ICacheEntryGroup<FK, SK, V, T> entryGroup =
firstKeyMap.get(firstKey);
- if (Objects.nonNull(entryGroup)) {
- entryGroup
- .getAllCacheEntries()
- .forEachRemaining(
- entry -> {
- if (!secondKeyChecker.test(entry.getKey())) {
- return;
- }
- entryGroup.computeCacheEntry(
- entry.getKey(),
- memory ->
- (secondKey, cacheEntry) -> {
-
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
- return cacheEntry;
- });
- });
- }
- mayEvict();
- }
}
private void mayEvict() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
index d1f8ab9923e..0791fadc325 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
@@ -45,6 +45,9 @@ interface ICacheEntryGroup<FK, SK, V, T extends
ICacheEntry<SK, V>> {
T computeCacheEntry(
final SK secondKey, final Function<AtomicLong, BiFunction<SK, T, T>>
computation);
+ T computeCacheEntryIfPresent(
+ final SK secondKey, final Function<AtomicLong, BiFunction<SK, T, T>>
computation);
+
long removeCacheEntry(final SK secondKey);
boolean isEmpty();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java
index 1f00d28dc59..7af9c25af6d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java
@@ -184,13 +184,13 @@ public class TableDeviceCacheEntry {
final boolean isInvalidate,
final boolean isTableModel) {
int result =
- lastCache.compareAndSet(null, new TableDeviceLastCache())
+ lastCache.compareAndSet(null, new TableDeviceLastCache(isTableModel))
? TableDeviceLastCache.INSTANCE_SIZE
: 0;
final TableDeviceLastCache cache = lastCache.get();
result +=
Objects.nonNull(cache)
- ? cache.initOrInvalidate(database, tableName, measurements,
isInvalidate, isTableModel)
+ ? cache.initOrInvalidate(database, tableName, measurements,
isInvalidate)
: 0;
return Objects.nonNull(lastCache.get()) ? result : 0;
}
@@ -207,9 +207,9 @@ public class TableDeviceCacheEntry {
return tryUpdateLastCache(measurements, timeValuePairs, false);
}
- int invalidateLastCache(final String measurement, final boolean
isTableModel) {
+ int invalidateLastCache(final String measurement) {
final TableDeviceLastCache cache = lastCache.get();
- final int result = Objects.nonNull(cache) ? cache.invalidate(measurement,
isTableModel) : 0;
+ final int result = Objects.nonNull(cache) ? cache.invalidate(measurement)
: 0;
return Objects.nonNull(lastCache.get()) ? result : 0;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java
index f814139e880..5a2ac3d1e53 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java
@@ -93,13 +93,17 @@ public class TableDeviceLastCache {
// Time is seen as "" as a measurement
private final Map<String, TimeValuePair> measurement2CachedLastMap = new
ConcurrentHashMap<>();
+ private final boolean isTableModel;
+
+ TableDeviceLastCache(final boolean isTableModel) {
+ this.isTableModel = isTableModel;
+ }
int initOrInvalidate(
final String database,
final String tableName,
final String[] measurements,
- final boolean isInvalidate,
- final boolean isTableModel) {
+ final boolean isInvalidate) {
final AtomicInteger diff = new AtomicInteger(0);
for (final String measurement : measurements) {
@@ -121,13 +125,13 @@ public class TableDeviceLastCache {
if (Objects.isNull(newPair)) {
diff.addAndGet(
-((isTableModel ? 0 : (int)
RamUsageEstimator.sizeOf(finalMeasurement))
- + getTVPairEntrySize(tvPair)));
+ + getTvPairEntrySize(tvPair)));
return null;
}
if (Objects.isNull(tvPair)) {
diff.addAndGet(
(isTableModel ? 0 : (int)
RamUsageEstimator.sizeOf(finalMeasurement))
- + getTVPairEntrySize(newPair));
+ + getTvPairEntrySize(newPair));
return newPair;
}
return tvPair;
@@ -151,7 +155,9 @@ public class TableDeviceLastCache {
for (int i = 0; i < measurements.length; ++i) {
if (Objects.isNull(timeValuePairs[i])) {
if (invalidateNull) {
- measurement2CachedLastMap.remove(measurements[i]);
+ diff.addAndGet(
+ -((int) RamUsageEstimator.sizeOf(measurements[i])
+ +
getTvPairEntrySize(measurement2CachedLastMap.remove(measurements[i]))));
}
continue;
}
@@ -181,7 +187,7 @@ public class TableDeviceLastCache {
}
@GuardedBy("DataRegionInsertLock#writeLock")
- int invalidate(final String measurement, final boolean isTableModel) {
+ int invalidate(final String measurement) {
final AtomicInteger diff = new AtomicInteger();
final AtomicLong time = new AtomicLong();
measurement2CachedLastMap.computeIfPresent(
@@ -189,7 +195,7 @@ public class TableDeviceLastCache {
(s, timeValuePair) -> {
diff.set(
(isTableModel ? 0 : (int) RamUsageEstimator.sizeOf(s))
- + getTVPairEntrySize(timeValuePair));
+ + getTvPairEntrySize(timeValuePair));
time.set(timeValuePair.getTimestamp());
return null;
});
@@ -200,7 +206,7 @@ public class TableDeviceLastCache {
"",
(s, timeValuePair) -> {
if (timeValuePair.getTimestamp() <= time.get()) {
- diff.addAndGet(getTVPairEntrySize(timeValuePair));
+ diff.addAndGet((int) RamUsageEstimator.sizeOf(s) +
getTvPairEntrySize(timeValuePair));
return null;
}
return timeValuePair;
@@ -209,13 +215,18 @@ public class TableDeviceLastCache {
return diff.get();
}
- private int getTVPairEntrySize(final TimeValuePair tvPair) {
- return (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
- + ((Objects.isNull(tvPair)
- || tvPair == PLACEHOLDER_TIME_VALUE_PAIR
- || tvPair == EMPTY_TIME_VALUE_PAIR)
- ? 0
- : tvPair.getSize());
+ private static int getTvPairEntrySize(final TimeValuePair tvPair) {
+ return (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY +
getTvPairSize(tvPair);
+ }
+
+ private static int getTvPairSize(final TimeValuePair tvPair) {
+ return isEmptyTvPair(tvPair) ? 0 : tvPair.getSize();
+ }
+
+ private static boolean isEmptyTvPair(final TimeValuePair tvPair) {
+ return Objects.isNull(tvPair)
+ || tvPair == PLACEHOLDER_TIME_VALUE_PAIR
+ || tvPair == EMPTY_TIME_VALUE_PAIR;
}
@Nullable
@@ -262,16 +273,21 @@ public class TableDeviceLastCache {
int estimateSize() {
return INSTANCE_SIZE
+ (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY *
measurement2CachedLastMap.size()
- + measurement2CachedLastMap.values().stream()
- .mapToInt(TimeValuePair::getSize)
+ + measurement2CachedLastMap.entrySet().stream()
+ .mapToInt(
+ entry ->
+ (isTableModel ? 0 : (int)
RamUsageEstimator.sizeOf(entry.getKey()))
+ + TableDeviceLastCache.getTvPairSize(entry.getValue()))
.reduce(0, Integer::sum);
}
private static int getDiffSize(
final TimeValuePair oldTimeValuePair, final TimeValuePair
newTimeValuePair) {
- if (oldTimeValuePair == EMPTY_TIME_VALUE_PAIR
- || oldTimeValuePair == PLACEHOLDER_TIME_VALUE_PAIR) {
- return newTimeValuePair.getSize();
+ if (isEmptyTvPair(oldTimeValuePair)) {
+ return getTvPairSize(newTimeValuePair);
+ }
+ if (isEmptyTvPair(newTimeValuePair)) {
+ return -getTvPairSize(oldTimeValuePair);
}
final TsPrimitiveType oldValue = oldTimeValuePair.getValue();
final TsPrimitiveType newValue = newTimeValuePair.getValue();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
index acb9c5e6180..ebbb137a2d8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
@@ -456,7 +456,7 @@ public class TableDeviceSchemaCache {
final ToIntFunction<TableDeviceCacheEntry> updateFunction =
PathPatternUtil.hasWildcard(measurement)
? entry -> -entry.invalidateLastCache()
- : entry -> -entry.invalidateLastCache(measurement, false);
+ : entry -> -entry.invalidateLastCache(measurement);
if (!devicePath.hasWildcard()) {
final IDeviceID deviceID = devicePath.getIDeviceID();
@@ -543,7 +543,7 @@ public class TableDeviceSchemaCache {
return dualKeyCache.stats().requestCount();
}
- long getMemoryUsage() {
+ public long getMemoryUsage() {
return dualKeyCache.stats().memoryUsage();
}
@@ -679,7 +679,7 @@ public class TableDeviceSchemaCache {
final ToIntFunction<TableDeviceCacheEntry> updateFunction =
isAttributeColumn
? entry -> -entry.invalidateAttributeColumn(columnName)
- : entry -> -entry.invalidateLastCache(columnName, true);
+ : entry -> -entry.invalidateLastCache(columnName);
dualKeyCache.update(new TableId(null, tableName), deviceID -> true,
updateFunction);
} finally {
readWriteLock.writeLock().unlock();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceNormalSchema.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceNormalSchema.java
index 4e9c2fe9fde..da37202c470 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceNormalSchema.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceNormalSchema.java
@@ -99,13 +99,13 @@ public class TreeDeviceNormalSchema implements
IDeviceSchema {
public int estimateSize() {
// Do not need to calculate database because it is interned
return INSTANCE_SIZE
+ + measurementMap.size() * (int)
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
+ measurementMap.entrySet().stream()
.mapToInt(
entry ->
Math.toIntExact(
RamUsageEstimator.sizeOf(entry.getKey())
- + SchemaCacheEntry.estimateSize(entry.getValue())
- + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY))
+ + SchemaCacheEntry.estimateSize(entry.getValue())))
.reduce(0, Integer::sum);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java
index b166e3d3772..e8d7644e1fb 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.SchemaCacheEntry;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.db.schemaengine.template.Template;
@@ -264,6 +265,11 @@ public class TreeDeviceSchemaCacheManagerTest {
Assert.assertEquals(
new TimeValuePair(2, new TsPrimitiveType.TsInt(2)),
treeDeviceSchemaCacheManager.getLastCache(new
MeasurementPath("root.db.d.s3")));
+
+ Assert.assertTrue(TableDeviceSchemaCache.getInstance().getMemoryUsage() >
0);
+
+ treeDeviceSchemaCacheManager.cleanUp();
+ Assert.assertEquals(0,
TableDeviceSchemaCache.getInstance().getMemoryUsage());
}
@Test
@@ -317,5 +323,13 @@ public class TreeDeviceSchemaCacheManagerTest {
Assert.assertEquals(1, measurementPaths.size());
Assert.assertEquals(TSDataType.FLOAT,
measurementPaths.get(0).getMeasurementSchema().getType());
Assert.assertEquals("root.sg1.d3.s1",
measurementPaths.get(0).getFullPath());
+
+ treeDeviceSchemaCacheManager.invalidateLastCache(new
MeasurementPath("root.sg1.**"));
+ treeDeviceSchemaCacheManager.invalidateDatabaseLastCache("root.sg1");
+ TableDeviceSchemaCache.getInstance().invalidateTreeSchema();
+ Assert.assertTrue(TableDeviceSchemaCache.getInstance().getMemoryUsage() >
0);
+
+ TableDeviceSchemaCache.getInstance().invalidateAll();
+ Assert.assertEquals(0,
TableDeviceSchemaCache.getInstance().getMemoryUsage());
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java
index 88aed090981..cde0ffbb13f 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java
@@ -248,6 +248,11 @@ public class TableDeviceSchemaCacheTest {
Assert.assertNull(
cache.getDeviceAttribute(
database1, convertTagValuesToDeviceID(table2, new String[]
{"hebei", "p_1", "d_1"})));
+
+ Assert.assertTrue(cache.getMemoryUsage() > 0);
+
+ cache.invalidateAll();
+ Assert.assertEquals(0, cache.getMemoryUsage());
}
@Test
@@ -502,6 +507,10 @@ public class TableDeviceSchemaCacheTest {
TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE, new
TsPrimitiveType.TsInt(3),
},
result.get().getRight());
+
+ cache.invalidateLastCache();
+ cache.invalidateLastCache();
+ Assert.assertTrue(cache.getMemoryUsage() > 0);
}
@Test