This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9a7e114a5b6 Ensure concurrency safety when updating last cache
9a7e114a5b6 is described below
commit 9a7e114a5b62412cd3384f147f5c5224a30676c8
Author: Chen YZ <[email protected]>
AuthorDate: Wed Sep 27 08:21:09 2023 +0800
Ensure concurrency safety when updating last cache
---
.../analyze/cache/schema/DataNodeSchemaCache.java | 43 +++++++++-----
.../analyze/cache/schema/SchemaCacheEntry.java | 16 +++++-
.../cache/schema/TimeSeriesSchemaCache.java | 62 +++++++++++++++------
.../schema/dualkeycache/impl/DualKeyCacheImpl.java | 65 +++++++++++++---------
.../dualkeycache/impl/FIFOCacheEntryManager.java | 7 ++-
.../schema/dualkeycache/impl/ICacheEntry.java | 2 +
.../dualkeycache/impl/LRUCacheEntryManager.java | 7 ++-
.../schema/lastcache/DataNodeLastCacheManager.java | 6 +-
.../cache/dualkeycache/DualKeyCacheTest.java | 4 +-
9 files changed, 143 insertions(+), 69 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
index 90cebfee2bf..7b50491ebc0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
@@ -42,7 +43,8 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.IntPredicate;
/**
* This class takes the responsibility of metadata cache management of all
DataRegions under
@@ -191,6 +193,7 @@ public class DataNodeSchemaCache {
}
/** get SchemaCacheEntry and update last cache */
+ @TestOnly
public void updateLastCache(
PartialPath devicePath,
String measurement,
@@ -207,20 +210,25 @@ public class DataNodeSchemaCache {
String[] measurements,
MeasurementSchema[] measurementSchemas,
boolean isAligned,
- Function<Integer, TimeValuePair> timeValuePairProvider,
- Function<Integer, Boolean> shouldUpdateProvider,
+ IntFunction<TimeValuePair> timeValuePairProvider,
+ IntPredicate shouldUpdateProvider,
boolean highPriorityUpdate,
Long latestFlushedTime) {
- timeSeriesSchemaCache.updateLastCache(
- database,
- devicePath,
- measurements,
- measurementSchemas,
- isAligned,
- timeValuePairProvider,
- shouldUpdateProvider,
- highPriorityUpdate,
- latestFlushedTime);
+ takeReadLock();
+ try {
+ timeSeriesSchemaCache.updateLastCache(
+ database,
+ devicePath,
+ measurements,
+ measurementSchemas,
+ isAligned,
+ timeValuePairProvider,
+ shouldUpdateProvider,
+ highPriorityUpdate,
+ latestFlushedTime);
+ } finally {
+ releaseReadLock();
+ }
}
/**
@@ -233,8 +241,13 @@ public class DataNodeSchemaCache {
TimeValuePair timeValuePair,
boolean highPriorityUpdate,
Long latestFlushedTime) {
- timeSeriesSchemaCache.updateLastCache(
- storageGroup, measurementPath, timeValuePair, highPriorityUpdate,
latestFlushedTime);
+ takeReadLock();
+ try {
+ timeSeriesSchemaCache.updateLastCache(
+ storageGroup, measurementPath, timeValuePair, highPriorityUpdate,
latestFlushedTime);
+ } finally {
+ releaseReadLock();
+ }
}
public void invalidateAll() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java
index 790558c9d9f..8df16a91ba5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.lastcache.ILastCacheContainer;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.lastcache.LastCacheContainer;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -76,14 +77,25 @@ public class SchemaCacheEntry implements
IMeasurementSchemaInfo {
}
public ILastCacheContainer getLastCacheContainer() {
+ return lastCacheContainer;
+ }
+
+ public int updateLastCache(
+ TimeValuePair timeValuePair, boolean highPriorityUpdate, Long
latestFlushedTime) {
+
if (lastCacheContainer == null) {
synchronized (this) {
if (lastCacheContainer == null) {
- lastCacheContainer = new LastCacheContainer();
+ ILastCacheContainer tmp = new LastCacheContainer();
+ int changeSize = tmp.estimateSize();
+ changeSize += tmp.updateCachedLast(timeValuePair,
highPriorityUpdate, latestFlushedTime);
+ lastCacheContainer = tmp;
+ return changeSize;
}
}
}
- return lastCacheContainer;
+ return lastCacheContainer.updateCachedLast(
+ timeValuePair, highPriorityUpdate, latestFlushedTime);
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
index 0d3321d9d73..a1b8f508f7e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.exception.metadata.view.InsertNonWritableViewException;
@@ -46,7 +47,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.IntPredicate;
public class TimeSeriesSchemaCache {
@@ -222,12 +224,6 @@ public class TimeSeriesSchemaCache {
return new Pair<>(indexOfMissingMeasurements, missedPathStringList);
}
- public void put(ClusterSchemaTree schemaTree) {
- for (MeasurementPath measurementPath : schemaTree.getAllMeasurement()) {
-
putSingleMeasurementPath(schemaTree.getBelongedDatabase(measurementPath),
measurementPath);
- }
- }
-
public void putSingleMeasurementPath(String storageGroup, MeasurementPath
measurementPath) {
SchemaCacheEntry schemaCacheEntry =
new SchemaCacheEntry(
@@ -250,6 +246,7 @@ public class TimeSeriesSchemaCache {
}
/** get SchemaCacheEntry and update last cache */
+ @TestOnly
public void updateLastCache(
PartialPath devicePath,
String measurement,
@@ -272,8 +269,8 @@ public class TimeSeriesSchemaCache {
String[] measurements,
MeasurementSchema[] measurementSchemas,
boolean isAligned,
- Function<Integer, TimeValuePair> timeValuePairProvider,
- Function<Integer, Boolean> shouldUpdateProvider,
+ IntFunction<TimeValuePair> timeValuePairProvider,
+ IntPredicate shouldUpdateProvider,
boolean highPriorityUpdate,
Long latestFlushedTime) {
SchemaCacheEntry entry;
@@ -292,7 +289,7 @@ public class TimeSeriesSchemaCache {
@Override
public int updateValue(int index, SchemaCacheEntry value) {
- if (!shouldUpdateProvider.apply(index)) {
+ if (!shouldUpdateProvider.test(index)) {
return 0;
}
if (value == null) {
@@ -316,10 +313,28 @@ public class TimeSeriesSchemaCache {
}
}
}
-
- DataNodeLastCacheManager.updateLastCache(
- entry, timeValuePairProvider.apply(index), highPriorityUpdate,
latestFlushedTime);
}
+ dualKeyCache.update(
+ new IDualKeyCacheUpdating<PartialPath, String, SchemaCacheEntry>() {
+ @Override
+ public PartialPath getFirstKey() {
+ return devicePath;
+ }
+
+ @Override
+ public String[] getSecondKeyList() {
+ return missingMeasurements.stream().map(i ->
measurements[i]).toArray(String[]::new);
+ }
+
+ @Override
+ public int updateValue(int index, SchemaCacheEntry value) {
+ return DataNodeLastCacheManager.updateLastCache(
+ value,
+ timeValuePairProvider.apply(missingMeasurements.get(index)),
+ highPriorityUpdate,
+ latestFlushedTime);
+ }
+ });
}
/**
@@ -342,16 +357,31 @@ public class TimeSeriesSchemaCache {
entry =
new SchemaCacheEntry(
storageGroup,
- (MeasurementSchema) measurementPath.getMeasurementSchema(),
+ measurementPath.getMeasurementSchema(),
measurementPath.getTagMap(),
measurementPath.isUnderAlignedEntity());
dualKeyCache.put(seriesPath.getDevicePath(),
seriesPath.getMeasurement(), entry);
}
}
}
+ dualKeyCache.update(
+ new IDualKeyCacheUpdating<PartialPath, String, SchemaCacheEntry>() {
+ @Override
+ public PartialPath getFirstKey() {
+ return measurementPath.getDevicePath();
+ }
- DataNodeLastCacheManager.updateLastCache(
- entry, timeValuePair, highPriorityUpdate, latestFlushedTime);
+ @Override
+ public String[] getSecondKeyList() {
+ return new String[] {measurementPath.getMeasurement()};
+ }
+
+ @Override
+ public int updateValue(int index, SchemaCacheEntry value) {
+ return DataNodeLastCacheManager.updateLastCache(
+ value, timeValuePair, highPriorityUpdate, latestFlushedTime);
+ }
+ });
}
public void invalidateAll() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
index aed8277024f..c02a3173ccd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
@@ -115,14 +115,22 @@ class DualKeyCacheImpl<FK, SK, V, T extends
ICacheEntry<SK, V>>
if (cacheEntry == null) {
updating.updateValue(i, null);
} else {
- int changeSize = updating.updateValue(i, cacheEntry.getValue());
- cacheEntryManager.access(cacheEntry);
- if (changeSize != 0) {
- cacheStats.increaseMemoryUsage(changeSize);
- if (cacheStats.isExceedMemoryCapacity()) {
- executeCacheEviction(changeSize);
+ int changeSize = 0;
+ synchronized (cacheEntry) {
+ if (cacheEntry.getBelongedGroup() != null) {
+ // Only update the value when the cache entry is not evicted.
+ // If the cache entry is evicted, getBelongedGroup is null.
+ // Synchronized is to guarantee the cache entry is not evicted
during the update.
+ changeSize = updating.updateValue(i, cacheEntry.getValue());
+ cacheEntryManager.access(cacheEntry);
+ if (changeSize != 0) {
+ cacheStats.increaseMemoryUsage(changeSize);
+ }
}
}
+ if (changeSize != 0 && cacheStats.isExceedMemoryCapacity()) {
+ executeCacheEviction(changeSize);
+ }
hitCount++;
}
}
@@ -194,31 +202,34 @@ class DualKeyCacheImpl<FK, SK, V, T extends
ICacheEntry<SK, V>>
if (evictCacheEntry == null) {
return 0;
}
- AtomicInteger evictedSize = new AtomicInteger(0);
-
evictedSize.getAndAdd(sizeComputer.computeValueSize(evictCacheEntry.getValue()));
+ synchronized (evictCacheEntry) {
+ AtomicInteger evictedSize = new AtomicInteger(0);
+
evictedSize.getAndAdd(sizeComputer.computeValueSize(evictCacheEntry.getValue()));
- ICacheEntryGroup<FK, SK, V, T> belongedGroup =
evictCacheEntry.getBelongedGroup();
- belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
-
evictedSize.getAndAdd(sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey()));
+ ICacheEntryGroup<FK, SK, V, T> belongedGroup =
evictCacheEntry.getBelongedGroup();
+ evictCacheEntry.setBelongedGroup(null);
+ belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
+
evictedSize.getAndAdd(sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey()));
- if (belongedGroup.isEmpty()) {
- firstKeyMap.compute(
- belongedGroup.getFirstKey(),
- (firstKey, cacheEntryGroup) -> {
- if (cacheEntryGroup == null) {
- // has been removed by other threads
- return null;
- }
- if (cacheEntryGroup.isEmpty()) {
-
evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
- return null;
- }
+ if (belongedGroup.isEmpty()) {
+ firstKeyMap.compute(
+ belongedGroup.getFirstKey(),
+ (firstKey, cacheEntryGroup) -> {
+ if (cacheEntryGroup == null) {
+ // has been removed by other threads
+ return null;
+ }
+ if (cacheEntryGroup.isEmpty()) {
+
evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
+ return null;
+ }
- // some other thread has put value to it
- return cacheEntryGroup;
- });
+ // some other thread has put value to it
+ return cacheEntryGroup;
+ });
+ }
+ return evictedSize.get();
}
- return evictedSize.get();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java
index 4d3c488d8eb..661e559dd82 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java
@@ -105,7 +105,7 @@ public class FIFOCacheEntryManager<FK, SK, V>
static class FIFOCacheEntry<SK, V> implements ICacheEntry<SK, V> {
private final SK secondKey;
- private final ICacheEntryGroup cacheEntryGroup;
+ private volatile ICacheEntryGroup cacheEntryGroup;
private V value;
@@ -132,6 +132,11 @@ public class FIFOCacheEntryManager<FK, SK, V>
return cacheEntryGroup;
}
+ @Override
+ public void setBelongedGroup(ICacheEntryGroup belongedGroup) {
+ this.cacheEntryGroup = belongedGroup;
+ }
+
@Override
public void replaceValue(V newValue) {
this.value = newValue;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntry.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntry.java
index 87fc118c613..655b8c6e56a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntry.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntry.java
@@ -35,5 +35,7 @@ interface ICacheEntry<SK, V> {
ICacheEntryGroup getBelongedGroup();
+ void setBelongedGroup(ICacheEntryGroup belongedGroup);
+
void replaceValue(V newValue);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java
index e5f8ff87496..a350c7c98ce 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java
@@ -103,7 +103,7 @@ class LRUCacheEntryManager<FK, SK, V>
static class LRUCacheEntry<SK, V> implements ICacheEntry<SK, V> {
private final SK secondKey;
- private final ICacheEntryGroup cacheEntryGroup;
+ private volatile ICacheEntryGroup cacheEntryGroup;
private V value;
@@ -131,6 +131,11 @@ class LRUCacheEntryManager<FK, SK, V>
return cacheEntryGroup;
}
+ @Override
+ public void setBelongedGroup(ICacheEntryGroup belongedGroup) {
+ this.cacheEntryGroup = belongedGroup;
+ }
+
@Override
public void replaceValue(V newValue) {
this.value = newValue;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java
index 678c7e8b430..6790f2cb645 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java
@@ -43,7 +43,7 @@ public class DataNodeLastCacheManager {
return null;
}
ILastCacheContainer lastCacheContainer = entry.getLastCacheContainer();
- return lastCacheContainer.getCachedLast();
+ return lastCacheContainer == null ? null :
lastCacheContainer.getCachedLast();
}
/**
@@ -63,8 +63,6 @@ public class DataNodeLastCacheManager {
if (!CACHE_ENABLED || null == entry) {
return 0;
}
- ILastCacheContainer lastCacheContainer = entry.getLastCacheContainer();
- return lastCacheContainer.updateCachedLast(
- timeValuePair, highPriorityUpdate, latestFlushedTime);
+ return entry.updateLastCache(timeValuePair, highPriorityUpdate,
latestFlushedTime);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
index 2db2b717a31..4cef9502f8e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
@@ -189,9 +189,7 @@ public class DualKeyCacheTest {
}
});
int tmp = SchemaCacheEntry.estimateSize(schemaCacheEntry);
- schemaCacheEntry
- .getLastCacheContainer()
- .updateCachedLast(new TimeValuePair(1L, new TsPrimitiveType.TsInt(1)),
true, 0L);
+ schemaCacheEntry.updateLastCache(new TimeValuePair(1L, new
TsPrimitiveType.TsInt(1)), true, 0L);
expectedSize += (SchemaCacheEntry.estimateSize(schemaCacheEntry) - tmp) *
2;
Assert.assertEquals(expectedSize, dualKeyCache.stats().memoryUsage());
}