This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a4eeeba9759 SchemaCache supports precise eviction
a4eeeba9759 is described below
commit a4eeeba97590bddbc026fed890e689aca8cdeb07
Author: Chen YZ <[email protected]>
AuthorDate: Thu Sep 28 09:32:30 2023 +0800
SchemaCache supports precise eviction
---
.../impl/DataNodeInternalRPCServiceImpl.java | 6 +-
.../analyze/cache/schema/DataNodeSchemaCache.java | 21 +++
.../schema/DeviceUsingTemplateSchemaCache.java | 18 ++
.../cache/schema/TimeSeriesSchemaCache.java | 9 +
.../cache/schema/dualkeycache/IDualKeyCache.java | 22 +++
.../dualkeycache/impl/CacheEntryGroupImpl.java | 6 +
.../schema/dualkeycache/impl/DualKeyCacheImpl.java | 90 ++++++++++
.../dualkeycache/impl/FIFOCacheEntryManager.java | 53 +++---
.../schema/dualkeycache/impl/ICacheEntryGroup.java | 4 +
.../dualkeycache/impl/ICacheEntryManager.java | 2 +
.../dualkeycache/impl/LRUCacheEntryManager.java | 79 ++++-----
.../db/storageengine/dataregion/DataRegion.java | 3 +-
.../cache/dualkeycache/DualKeyCacheTest.java | 181 +++++++++++++++++++++
13 files changed, 423 insertions(+), 71 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 2c777218dbf..084662c9f8a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -444,7 +444,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) {
DataNodeSchemaCache.getInstance().takeWriteLock();
try {
- DataNodeSchemaCache.getInstance().invalidateAll();
+ // req.getFullPath() is a database path
+ DataNodeSchemaCache.getInstance().invalidate(req.getFullPath());
ClusterTemplateManager.getInstance().invalid(req.getFullPath());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} finally {
@@ -515,8 +516,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
DataNodeSchemaCache cache = DataNodeSchemaCache.getInstance();
cache.takeWriteLock();
try {
- // todo implement precise timeseries clean rather than clean all
- cache.invalidateAll();
+
cache.invalidate(PathPatternTree.deserialize(req.pathPatternTree).getAllPathPatterns());
} finally {
cache.releaseWriteLock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
index 7b50491ebc0..1f3fde40369 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
@@ -250,6 +250,27 @@ public class DataNodeSchemaCache {
}
}
+ public void invalidate(String database) {
+ deviceUsingTemplateSchemaCache.invalidateCache(database);
+ timeSeriesSchemaCache.invalidate(database);
+ }
+
+ public void invalidate(List<PartialPath> partialPathList) {
+ boolean doPrecise = true;
+ for (PartialPath partialPath : partialPathList) {
+ if (partialPath.getDevicePath().hasWildcard()) {
+ doPrecise = false;
+ break;
+ }
+ }
+ if (doPrecise) {
+ deviceUsingTemplateSchemaCache.invalidateCache(partialPathList);
+ timeSeriesSchemaCache.invalidate(partialPathList);
+ } else {
+ invalidateAll();
+ }
+ }
+
public void invalidateAll() {
deviceUsingTemplateSchemaCache.invalidateCache();
timeSeriesSchemaCache.invalidateAll();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceUsingTemplateSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceUsingTemplateSchemaCache.java
index ac62c1d1331..56275c9ecf1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceUsingTemplateSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceUsingTemplateSchemaCache.java
@@ -210,6 +210,24 @@ public class DeviceUsingTemplateSchemaCache {
cache.invalidateAll();
}
+ public void invalidateCache(String database) {
+ for (PartialPath key : cache.asMap().keySet()) {
+ if (key.startsWith(database)) {
+ cache.invalidate(key);
+ }
+ }
+ }
+
+ public void invalidateCache(List<PartialPath> partialPathList) {
+ for (PartialPath path : partialPathList) {
+ for (PartialPath key : cache.asMap().keySet()) {
+ if (key.startsWith(path.getDevice())) {
+ cache.invalidate(key);
+ }
+ }
+ }
+ }
+
private static class DeviceCacheEntry {
private final String database;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
index a1b8f508f7e..5613a7c8cdc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
@@ -55,6 +55,7 @@ public class TimeSeriesSchemaCache {
private static final Logger logger =
LoggerFactory.getLogger(DataNodeSchemaCache.class);
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ // <device, measurement, entry>
private final IDualKeyCache<PartialPath, String, SchemaCacheEntry>
dualKeyCache;
TimeSeriesSchemaCache() {
@@ -388,6 +389,14 @@ public class TimeSeriesSchemaCache {
dualKeyCache.invalidateAll();
}
+ public void invalidate(String database) {
+ dualKeyCache.invalidate(database);
+ }
+
+ public void invalidate(List<PartialPath> partialPathList) {
+ dualKeyCache.invalidate(partialPathList);
+ }
+
public void cleanUp() {
dualKeyCache.cleanUp();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java
index f03e3fae073..0fcf11b37fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java
@@ -19,8 +19,13 @@
package org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
+
import javax.annotation.concurrent.GuardedBy;
+import java.util.List;
+
/**
* This interfaces defines the behaviour of a dual key cache. A dual key cache
supports manage cache
* values via two keys, first key and second key. Simply, the structure is
like fk -> sk-> value.
@@ -56,6 +61,20 @@ public interface IDualKeyCache<FK, SK, V> {
@GuardedBy("DataNodeSchemaCache#writeLock")
void invalidateAll();
+ /**
+ * Invalidate cache values in the cache and clear related cache keys. The
cache status and
+ * statistics won't be clear and they can still be accessed via
cache.stats().
+ */
+ @GuardedBy("DataNodeSchemaCache#writeLock")
+ void invalidate(String database);
+
+ /**
+ * Invalidate cache values in the cache and clear related cache keys. The
cache status and
+ * statistics won't be clear and they can still be accessed via
cache.stats().
+ */
+ @GuardedBy("DataNodeSchemaCache#writeLock")
+ void invalidate(List<PartialPath> partialPathList);
+
/**
* Clean up all data and info of this cache, including cache keys, cache
values and cache stats.
*/
@@ -64,4 +83,7 @@ public interface IDualKeyCache<FK, SK, V> {
/** Return all the current cache status and statistics. */
IDualKeyCacheStats stats();
+
+ @TestOnly
+ void evictOneEntry();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
index 30fcb080ae9..b15f6d153fc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
@@ -19,6 +19,7 @@
package
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl;
+import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -45,6 +46,11 @@ public class CacheEntryGroupImpl<FK, SK, V, T extends
ICacheEntry<SK, V>>
return secondKey == null ? null : cacheEntryMap.get(secondKey);
}
+ @Override
+ public Iterator<Map.Entry<SK, T>> getAllCacheEntries() {
+ return cacheEntryMap.entrySet().iterator();
+ }
+
@Override
public T computeCacheEntry(SK secondKey, BiFunction<SK, T, T> computation) {
return cacheEntryMap.compute(secondKey, computation);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
index c02a3173ccd..96bc7fd7408 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
@@ -19,15 +19,23 @@
package
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternUtil;
+import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.IDualKeyCache;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.IDualKeyCacheComputation;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.IDualKeyCacheStats;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.IDualKeyCacheUpdating;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
+import java.util.function.Function;
class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK, V>>
implements IDualKeyCache<FK, SK, V> {
@@ -237,6 +245,65 @@ class DualKeyCacheImpl<FK, SK, V, T extends
ICacheEntry<SK, V>>
executeInvalidateAll();
}
+ @Override
+ public void invalidate(String database) {
+ int estimateSize = 0;
+ for (FK device : firstKeyMap.getAllKeys()) {
+ if (device.toString().startsWith(database)) {
+ estimateSize += sizeComputer.computeFirstKeySize(device);
+ ICacheEntryGroup<FK, SK, V, T> entryGroup = firstKeyMap.get(device);
+ for (Iterator<Map.Entry<SK, T>> it = entryGroup.getAllCacheEntries();
it.hasNext(); ) {
+ Map.Entry<SK, T> entry = it.next();
+ estimateSize += sizeComputer.computeSecondKeySize(entry.getKey());
+ estimateSize +=
sizeComputer.computeValueSize(entry.getValue().getValue());
+ cacheEntryManager.invalid(entry.getValue());
+ }
+ firstKeyMap.remove(device);
+ }
+ }
+ cacheStats.decreaseMemoryUsage(estimateSize);
+ }
+
+ @Override
+ public void invalidate(List<PartialPath> partialPathList) {
+ int estimateSize = 0;
+ for (PartialPath path : partialPathList) {
+ String measurement = path.getMeasurement();
+ Function<FK, Boolean> deviceFilter;
+ Function<SK, Boolean> measurementFilter;
+ if (PathPatternUtil.isMultiLevelMatchWildcard(measurement)) {
+ deviceFilter = d ->
d.toString().startsWith(path.getDevicePath().getFullPath());
+ measurementFilter = m -> true;
+ } else {
+ deviceFilter = d ->
d.toString().equals(path.getDevicePath().getFullPath());
+ measurementFilter = m -> PathPatternUtil.isNodeMatch(measurement,
m.toString());
+ }
+ for (FK device : firstKeyMap.getAllKeys()) {
+ if (Boolean.TRUE.equals(deviceFilter.apply(device))) {
+ boolean allSKInvalid = true;
+ ICacheEntryGroup<FK, SK, V, T> entryGroup = firstKeyMap.get(device);
+ for (Iterator<Map.Entry<SK, T>> it =
entryGroup.getAllCacheEntries(); it.hasNext(); ) {
+ Map.Entry<SK, T> entry = it.next();
+ if (Boolean.TRUE.equals(measurementFilter.apply(entry.getKey()))) {
+ estimateSize +=
sizeComputer.computeSecondKeySize(entry.getKey());
+ estimateSize +=
sizeComputer.computeValueSize(entry.getValue().getValue());
+ cacheEntryManager.invalid(entry.getValue());
+ it.remove();
+ } else {
+ allSKInvalid = false;
+ }
+ }
+ if (allSKInvalid) {
+ firstKeyMap.remove(device);
+ estimateSize += sizeComputer.computeFirstKeySize(device);
+ }
+ }
+ }
+ }
+
+ cacheStats.decreaseMemoryUsage(estimateSize);
+ }
+
private void executeInvalidateAll() {
firstKeyMap.clear();
cacheEntryManager.cleanUp();
@@ -254,6 +321,12 @@ class DualKeyCacheImpl<FK, SK, V, T extends
ICacheEntry<SK, V>>
return cacheStats;
}
+ @Override
+ @TestOnly
+ public void evictOneEntry() {
+ cacheStats.decreaseMemoryUsage(evictOneCacheEntry());
+ }
+
/**
* Since the capacity of one instance of ConcurrentHashMap is about 4
million, a number of
* instances are united for more capacity.
@@ -268,6 +341,10 @@ class DualKeyCacheImpl<FK, SK, V, T extends
ICacheEntry<SK, V>>
return getBelongedMap(key).get(key);
}
+ V remove(K key) {
+ return getBelongedMap(key).remove(key);
+ }
+
V compute(K key, BiFunction<? super K, ? super V, ? extends V>
remappingFunction) {
return getBelongedMap(key).compute(key, remappingFunction);
}
@@ -295,5 +372,18 @@ class DualKeyCacheImpl<FK, SK, V, T extends
ICacheEntry<SK, V>>
}
return map;
}
+
+ List<K> getAllKeys() {
+ List<K> res = new ArrayList<>();
+ Arrays.stream(maps)
+ .iterator()
+ .forEachRemaining(
+ map -> {
+ if (map != null) {
+ res.addAll(map.keySet());
+ }
+ });
+ return res;
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java
index 661e559dd82..b19b1f4b4de 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java
@@ -49,6 +49,14 @@ public class FIFOCacheEntryManager<FK, SK, V>
getNextList(cachePutRoundRobinIndex).add(cacheEntry);
}
+ @Override
+ public void invalid(FIFOCacheEntry<SK, V> cacheEntry) {
+ cacheEntry.next.pre = cacheEntry.pre;
+ cacheEntry.pre.next = cacheEntry.next;
+ cacheEntry.next = null;
+ cacheEntry.pre = null;
+ }
+
@Override
public FIFOCacheEntry<SK, V> evict() {
int startIndex = getNextIndex(cacheEvictRoundRobinIndex);
@@ -109,7 +117,8 @@ public class FIFOCacheEntryManager<FK, SK, V>
private V value;
- private FIFOCacheEntry<SK, V> pre;
+ private FIFOCacheEntry<SK, V> pre = null;
+ private FIFOCacheEntry<SK, V> next = null;
private FIFOCacheEntry(SK secondKey, V value, ICacheEntryGroup
cacheEntryGroup) {
this.secondKey = secondKey;
@@ -157,37 +166,35 @@ public class FIFOCacheEntryManager<FK, SK, V>
}
}
- private static class FIFOLinkedList {
+ private static class FIFOLinkedList<SK, V> {
- private FIFOCacheEntry head;
- private FIFOCacheEntry tail;
+ // head.next is the newest
+ private final FIFOCacheEntry head;
+ private final FIFOCacheEntry tail;
- synchronized void add(FIFOCacheEntry cacheEntry) {
- if (head == null) {
- head = cacheEntry;
- tail = cacheEntry;
- return;
- }
-
- head.pre = cacheEntry;
+ public FIFOLinkedList() {
+ head = new FIFOCacheEntry(null, null, null);
+ tail = new FIFOCacheEntry(null, null, null);
+ head.next = tail;
+ tail.pre = head;
+ }
- head = cacheEntry;
+ synchronized void add(FIFOCacheEntry cacheEntry) {
+ cacheEntry.next = head.next;
+ cacheEntry.pre = head;
+ head.next.pre = cacheEntry;
+ head.next = cacheEntry;
}
synchronized FIFOCacheEntry evict() {
- if (tail == null) {
+ if (tail.pre == head) {
return null;
}
-
- FIFOCacheEntry cacheEntry = tail;
- tail = tail.pre;
-
- if (tail == null) {
- head = null;
- }
-
+ FIFOCacheEntry cacheEntry = tail.pre;
+ cacheEntry.pre.next = cacheEntry.next;
+ cacheEntry.next.pre = cacheEntry.pre;
+ cacheEntry.next = null;
cacheEntry.pre = null;
-
return cacheEntry;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
index 70346dfb1f4..89f562e3132 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
@@ -19,6 +19,8 @@
package
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl;
+import java.util.Iterator;
+import java.util.Map;
import java.util.function.BiFunction;
/**
@@ -36,6 +38,8 @@ interface ICacheEntryGroup<FK, SK, V, T extends
ICacheEntry<SK, V>> {
T getCacheEntry(SK secondKey);
+ Iterator<Map.Entry<SK, T>> getAllCacheEntries();
+
T computeCacheEntry(SK secondKey, BiFunction<SK, T, T> computation);
T removeCacheEntry(SK secondKey);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryManager.java
index 8f850e10ee6..a43e612b0e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryManager.java
@@ -36,6 +36,8 @@ interface ICacheEntryManager<FK, SK, V, T extends
ICacheEntry<SK, V>> {
void put(T cacheEntry);
+ void invalid(T cacheEntry);
+
T evict();
void cleanUp();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java
index a350c7c98ce..7a3ae5dd2a6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java
@@ -54,6 +54,14 @@ class LRUCacheEntryManager<FK, SK, V>
getBelongedList(cacheEntry).add(cacheEntry);
}
+ @Override
+ public void invalid(LRUCacheEntry<SK, V> cacheEntry) {
+ cacheEntry.next.pre = cacheEntry.pre;
+ cacheEntry.pre.next = cacheEntry.next;
+ cacheEntry.next = null;
+ cacheEntry.pre = null;
+ }
+
@Override
public LRUCacheEntry<SK, V> evict() {
int startIndex = idxGenerator.nextInt(SLOT_NUM);
@@ -158,64 +166,49 @@ class LRUCacheEntryManager<FK, SK, V>
private static class LRULinkedList {
- private LRUCacheEntry head;
- private LRUCacheEntry tail;
-
- synchronized void add(LRUCacheEntry cacheEntry) {
- if (head == null) {
- head = cacheEntry;
- tail = cacheEntry;
- return;
- }
+ // head.next is the most recently used entry
+ private final LRUCacheEntry head;
+ private final LRUCacheEntry tail;
- head.pre = cacheEntry;
- cacheEntry.next = head;
+ public LRULinkedList() {
+ head = new LRUCacheEntry(null, null, null);
+ tail = new LRUCacheEntry(null, null, null);
+ head.next = tail;
+ tail.pre = head;
+ }
- head = cacheEntry;
+ synchronized void add(LRUCacheEntry cacheEntry) {
+ cacheEntry.next = head.next;
+ cacheEntry.pre = head;
+ head.next.pre = cacheEntry;
+ head.next = cacheEntry;
}
synchronized LRUCacheEntry evict() {
- if (tail == null) {
+ if (tail.pre == head) {
return null;
}
-
- LRUCacheEntry cacheEntry = tail;
- tail = tail.pre;
-
- if (tail == null) {
- head = null;
- } else {
- tail.next = null;
- }
-
+ LRUCacheEntry cacheEntry = tail.pre;
+ cacheEntry.pre.next = cacheEntry.next;
+ cacheEntry.next.pre = cacheEntry.pre;
+ cacheEntry.next = null;
cacheEntry.pre = null;
-
return cacheEntry;
}
synchronized void moveToHead(LRUCacheEntry cacheEntry) {
- if (head == null) {
- // this cache entry has been evicted and the cache list is empty
- return;
- }
-
- if (cacheEntry.pre == null) {
- // this entry is head
+ if (cacheEntry.next == null || cacheEntry.pre == null) {
+ // this cache entry has been evicted
return;
}
-
+ // remove cache entry from the list
cacheEntry.pre.next = cacheEntry.next;
-
- if (cacheEntry.next != null) {
- cacheEntry.next.pre = cacheEntry.pre;
- }
-
- cacheEntry.pre = null;
-
- head.pre = cacheEntry;
- cacheEntry.next = head;
-
- head = cacheEntry;
+ cacheEntry.next.pre = cacheEntry.pre;
+ // add cache entry to the head
+ cacheEntry.next = head.next;
+ cacheEntry.pre = head;
+ head.next.pre = cacheEntry;
+ head.next = cacheEntry;
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 0794f469073..a4b5b0db661 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1817,10 +1817,9 @@ public class DataRegion implements IDataRegionForQuery {
Set<PartialPath> devicePaths = new
HashSet<>(pattern.getDevicePathPattern());
// delete Last cache record if necessary
- // todo implement more precise process
DataNodeSchemaCache.getInstance().takeWriteLock();
try {
- DataNodeSchemaCache.getInstance().invalidateAll();
+
DataNodeSchemaCache.getInstance().invalidate(Collections.singletonList(pattern));
} finally {
DataNodeSchemaCache.getInstance().releaseWriteLock();
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
index 4cef9502f8e..65a88591d7b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.metadata.cache.dualkeycache;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.SchemaCacheEntry;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.IDualKeyCache;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.IDualKeyCacheComputation;
@@ -193,4 +195,183 @@ public class DualKeyCacheTest {
expectedSize += (SchemaCacheEntry.estimateSize(schemaCacheEntry) - tmp) *
2;
Assert.assertEquals(expectedSize, dualKeyCache.stats().memoryUsage());
}
+
+ @Test
+ public void testInvalidDatabase() throws IllegalPathException {
+ IDualKeyCache<PartialPath, String, SchemaCacheEntry> dualKeyCache =
generateCache();
+ dualKeyCache.invalidate("root.db1");
+ Assert.assertNull(dualKeyCache.get(new PartialPath("root.db1.d1"), "s1"));
+ Assert.assertNull(dualKeyCache.get(new PartialPath("root.db1.d1"), "s2"));
+ Assert.assertNull(dualKeyCache.get(new PartialPath("root.db1"), "s11"));
+ Assert.assertNotNull(dualKeyCache.get(new PartialPath("root.db2.d1"),
"s1"));
+ Assert.assertNotNull(dualKeyCache.get(new PartialPath("root.db2.d1"),
"s2"));
+ int expectSize =
+ PartialPath.estimateSize(new PartialPath("root.db2.d1"))
+ + computeStringSize("s1") * 2
+ + SchemaCacheEntry.estimateSize(
+ new SchemaCacheEntry(
+ "root.db1",
+ new MeasurementSchema("s1", TSDataType.INT32),
+ Collections.emptyMap(),
+ false))
+ * 2;
+ Assert.assertEquals(expectSize, dualKeyCache.stats().memoryUsage());
+ dualKeyCache.evictOneEntry();
+ expectSize =
+ PartialPath.estimateSize(new PartialPath("root.db2.d1"))
+ + computeStringSize("s1")
+ + SchemaCacheEntry.estimateSize(
+ new SchemaCacheEntry(
+ "root.db1",
+ new MeasurementSchema("s1", TSDataType.INT32),
+ Collections.emptyMap(),
+ false));
+ Assert.assertEquals(expectSize, dualKeyCache.stats().memoryUsage());
+ dualKeyCache.evictOneEntry();
+ Assert.assertEquals(0, dualKeyCache.stats().memoryUsage());
+ }
+
+ @Test
+ public void testInvalidPathPattern1() throws IllegalPathException {
+ IDualKeyCache<PartialPath, String, SchemaCacheEntry> dualKeyCache =
generateCache();
+ dualKeyCache.invalidate(
+ Arrays.asList(new PartialPath("root.db2.**"), new
PartialPath("root.db1.d1.s1")));
+ Assert.assertNull(dualKeyCache.get(new PartialPath("root.db1.d1"), "s1"));
+ Assert.assertNotNull(dualKeyCache.get(new PartialPath("root.db1.d1"),
"s2"));
+ Assert.assertNotNull(dualKeyCache.get(new PartialPath("root.db1"), "s11"));
+ Assert.assertNull(dualKeyCache.get(new PartialPath("root.db2.d1"), "s1"));
+ Assert.assertNull(dualKeyCache.get(new PartialPath("root.db2.d1"), "s2"));
+ int expectSize =
+ PartialPath.estimateSize(new PartialPath("root.db1.d1"))
+ + PartialPath.estimateSize(new PartialPath("root.db1"))
+ + computeStringSize("s1")
+ + computeStringSize("s11")
+ + SchemaCacheEntry.estimateSize(
+ new SchemaCacheEntry(
+ "root.db1",
+ new MeasurementSchema("s1", TSDataType.INT32),
+ Collections.emptyMap(),
+ false))
+ + SchemaCacheEntry.estimateSize(
+ new SchemaCacheEntry(
+ "root.db1",
+ new MeasurementSchema("s11", TSDataType.INT32),
+ Collections.emptyMap(),
+ false));
+ Assert.assertEquals(expectSize, dualKeyCache.stats().memoryUsage());
+ dualKeyCache.evictOneEntry();
+ dualKeyCache.evictOneEntry();
+ Assert.assertEquals(0, dualKeyCache.stats().memoryUsage());
+ }
+
+ @Test
+ public void testInvalidPathPattern2() throws IllegalPathException {
+ IDualKeyCache<PartialPath, String, SchemaCacheEntry> dualKeyCache =
generateCache();
+ dualKeyCache.invalidate(
+ Arrays.asList(new PartialPath("root.db1.**"), new
PartialPath("root.db2.d1.*1")));
+ Assert.assertNull(dualKeyCache.get(new PartialPath("root.db1.d1"), "s1"));
+ Assert.assertNull(dualKeyCache.get(new PartialPath("root.db1.d1"), "s2"));
+ Assert.assertNull(dualKeyCache.get(new PartialPath("root.db1"), "s11"));
+ Assert.assertNull(dualKeyCache.get(new PartialPath("root.db2.d1"), "s1"));
+ Assert.assertNotNull(dualKeyCache.get(new PartialPath("root.db2.d1"),
"s2"));
+ int expectSize =
+ PartialPath.estimateSize(new PartialPath("root.db2.d1"))
+ + computeStringSize("s2")
+ + SchemaCacheEntry.estimateSize(
+ new SchemaCacheEntry(
+ "root.db2",
+ new MeasurementSchema("s2", TSDataType.INT32),
+ Collections.emptyMap(),
+ false));
+ Assert.assertEquals(expectSize, dualKeyCache.stats().memoryUsage());
+ dualKeyCache.evictOneEntry();
+ Assert.assertEquals(0, dualKeyCache.stats().memoryUsage());
+ }
+
+ private IDualKeyCache<PartialPath, String, SchemaCacheEntry> generateCache()
+ throws IllegalPathException {
+ DualKeyCacheBuilder<PartialPath, String, SchemaCacheEntry>
dualKeyCacheBuilder =
+ new DualKeyCacheBuilder<>();
+ IDualKeyCache<PartialPath, String, SchemaCacheEntry> dualKeyCache =
+ dualKeyCacheBuilder
+ .cacheEvictionPolicy(DualKeyCachePolicy.valueOf(policy))
+ .memoryCapacity(2000) // actual threshold is 1600
+ .firstKeySizeComputer(PartialPath::estimateSize)
+ .secondKeySizeComputer(this::computeStringSize)
+ .valueSizeComputer(SchemaCacheEntry::estimateSize)
+ .build();
+ dualKeyCache.put(
+ new PartialPath("root.db1.d1"),
+ "s1",
+ new SchemaCacheEntry(
+ "root.db1",
+ new MeasurementSchema("s1", TSDataType.INT32),
+ Collections.emptyMap(),
+ false));
+ dualKeyCache.put(
+ new PartialPath("root.db1.d1"),
+ "s2",
+ new SchemaCacheEntry(
+ "root.db1",
+ new MeasurementSchema("s1", TSDataType.INT32),
+ Collections.emptyMap(),
+ false));
+ dualKeyCache.put(
+ new PartialPath("root.db1"),
+ "s11",
+ new SchemaCacheEntry(
+ "root.db1",
+ new MeasurementSchema("s11", TSDataType.INT32),
+ Collections.emptyMap(),
+ false));
+ dualKeyCache.put(
+ new PartialPath("root.db1.d1"),
+ "s2",
+ new SchemaCacheEntry(
+ "root.db1",
+ new MeasurementSchema("s1", TSDataType.INT32),
+ Collections.emptyMap(),
+ false));
+ dualKeyCache.put(
+ new PartialPath("root.db2.d1"),
+ "s1",
+ new SchemaCacheEntry(
+ "root.db2",
+ new MeasurementSchema("s1", TSDataType.INT32),
+ Collections.emptyMap(),
+ false));
+ dualKeyCache.put(
+ new PartialPath("root.db2.d1"),
+ "s2",
+ new SchemaCacheEntry(
+ "root.db2",
+ new MeasurementSchema("s1", TSDataType.INT32),
+ Collections.emptyMap(),
+ false));
+ Assert.assertNotNull(dualKeyCache.get(new PartialPath("root.db1.d1"),
"s1"));
+ Assert.assertNotNull(dualKeyCache.get(new PartialPath("root.db1.d1"),
"s2"));
+ Assert.assertNotNull(dualKeyCache.get(new PartialPath("root.db1"), "s11"));
+ Assert.assertNotNull(dualKeyCache.get(new PartialPath("root.db2.d1"),
"s1"));
+ Assert.assertNotNull(dualKeyCache.get(new PartialPath("root.db2.d1"),
"s2"));
+ int expectSize =
+ PartialPath.estimateSize(new PartialPath("root.db1.d1")) * 2
+ + PartialPath.estimateSize(new PartialPath("root.db1"))
+ + computeStringSize("s1") * 4
+ + computeStringSize("s11")
+ + SchemaCacheEntry.estimateSize(
+ new SchemaCacheEntry(
+ "root.db1",
+ new MeasurementSchema("s1", TSDataType.INT32),
+ Collections.emptyMap(),
+ false))
+ * 4
+ + SchemaCacheEntry.estimateSize(
+ new SchemaCacheEntry(
+ "root.db1",
+ new MeasurementSchema("s11", TSDataType.INT32),
+ Collections.emptyMap(),
+ false));
+ Assert.assertEquals(expectSize, dualKeyCache.stats().memoryUsage());
+ return dualKeyCache;
+ }
}