This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch NPE-fix-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/NPE-fix-13 by this push:
new 734e65d7d22 refactor
734e65d7d22 is described below
commit 734e65d7d223b9fc12552a02c9479abf400c6cff
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 8 17:10:25 2025 +0800
refactor
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 15 -
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 13 -
.../protocol/rest/v2/impl/RestApiServiceImpl.java | 27 +-
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 35 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 4 +-
.../analyze/cache/schema/DataNodeSchemaCache.java | 7 -
.../analyze/cache/schema/DeviceSchemaCache.java | 123 +------
.../{IDualKeyCache.java => ICache.java} | 42 +--
.../{IDualKeyCacheStats.java => ICacheStats.java} | 2 +-
.../dualkeycache/IDualKeyCacheComputation.java | 43 ---
...{DualKeyCacheBuilder.java => CacheBuilder.java} | 30 +-
.../dualkeycache/impl/CacheEntryGroupImpl.java | 24 +-
.../cache/schema/dualkeycache/impl/CacheImpl.java | 203 +++++++++++
.../{DualKeyCachePolicy.java => CachePolicy.java} | 2 +-
.../dualkeycache/impl/CacheSizeComputerImpl.java | 14 +-
.../cache/schema/dualkeycache/impl/CacheStats.java | 4 +-
.../schema/dualkeycache/impl/DualKeyCacheImpl.java | 406 ---------------------
.../dualkeycache/impl/FIFOCacheEntryManager.java | 6 +-
.../schema/dualkeycache/impl/ICacheEntryGroup.java | 5 +-
.../dualkeycache/impl/ICacheEntryManager.java | 4 +-
.../dualkeycache/impl/ICacheSizeComputer.java | 4 +-
.../dualkeycache/impl/LRUCacheEntryManager.java | 7 +-
.../schemaengine/schemaregion/ISchemaRegion.java | 3 +-
.../schemaregion/impl/SchemaRegionMemoryImpl.java | 3 +-
.../schemaregion/impl/SchemaRegionPBTreeImpl.java | 3 +-
.../mtree/impl/mem/MTreeBelowSGMemoryImpl.java | 8 +-
.../db/storageengine/dataregion/DataRegion.java | 2 +-
27 files changed, 297 insertions(+), 742 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index aacf9a58f43..a9d83590e9a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1051,12 +1051,6 @@ public class IoTDBConfig {
/** Policy of DataNodeSchemaCache eviction */
private String dataNodeSchemaCacheEvictionPolicy = "FIFO";
- /**
- * Threshold for cache size in mayEvict. When cache size exceeds this
threshold, the system will
- * compute total memory in each eviction iteration to ensure accurate memory
management.
- */
- private int cacheEvictionMemoryComputationThreshold = 20;
-
private int schemaThreadCount = 5;
private String readConsistencyLevel = "strong";
@@ -3511,15 +3505,6 @@ public class IoTDBConfig {
this.dataNodeSchemaCacheEvictionPolicy = dataNodeSchemaCacheEvictionPolicy;
}
- public int getCacheEvictionMemoryComputationThreshold() {
- return cacheEvictionMemoryComputationThreshold;
- }
-
- public void setCacheEvictionMemoryComputationThreshold(
- int cacheEvictionMemoryComputationThreshold) {
- this.cacheEvictionMemoryComputationThreshold =
cacheEvictionMemoryComputationThreshold;
- }
-
public int getSchemaThreadCount() {
return schemaThreadCount;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 1ae3c4c7fef..f6511600745 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1097,12 +1097,6 @@ public class IoTDBDescriptor {
properties.getProperty(
"datanode_schema_cache_eviction_policy",
conf.getDataNodeSchemaCacheEvictionPolicy()));
- conf.setCacheEvictionMemoryComputationThreshold(
- Integer.parseInt(
- properties.getProperty(
- "cache_eviction_memory_computation_threshold",
-
String.valueOf(conf.getCacheEvictionMemoryComputationThreshold()))));
-
conf.setSchemaThreadCount(
Integer.parseInt(
properties.getProperty(
@@ -2088,13 +2082,6 @@ public class IoTDBDescriptor {
// update trusted_uri_pattern
loadTrustedUriPattern(properties);
- // update cache_eviction_memory_computation_threshold
- conf.setCacheEvictionMemoryComputationThreshold(
- Integer.parseInt(
- properties.getProperty(
- "cache_eviction_memory_computation_threshold",
-
String.valueOf(conf.getCacheEvictionMemoryComputationThreshold()))));
-
// tvlist_sort_threshold
conf.setTVListSortThreshold(
Integer.parseInt(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
index 0a0eb119b13..05fc4b80084 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
@@ -111,7 +111,7 @@ public class RestApiServiceImpl extends RestApiService {
PartialPath prefixPath =
new PartialPath(prefixPathList.getPrefixPaths().toArray(new
String[0]));
- final Map<String, Map<PartialPath, Map<String, TimeValuePair>>>
resultMap = new HashMap<>();
+ final Map<PartialPath, Map<String, TimeValuePair>> resultMap = new
HashMap<>();
final String prefixString = prefixPath.toString();
for (ISchemaRegion region :
SchemaEngine.getInstance().getAllSchemaRegions()) {
@@ -170,21 +170,18 @@ public class RestApiServiceImpl extends RestApiService {
List<Object> timeseries = new ArrayList<>();
List<Object> valueList = new ArrayList<>();
List<Object> dataTypeList = new ArrayList<>();
- for (Map.Entry<String, Map<PartialPath, Map<String, TimeValuePair>>>
entry :
+ for (final Map.Entry<PartialPath, Map<String, TimeValuePair>>
device2MeasurementLastEntry :
resultMap.entrySet()) {
- for (final Map.Entry<PartialPath, Map<String, TimeValuePair>>
device2MeasurementLastEntry :
- entry.getValue().entrySet()) {
- final String deviceWithSeparator =
- device2MeasurementLastEntry.getKey() +
TsFileConstant.PATH_SEPARATOR;
- for (Map.Entry<String, TimeValuePair> measurementEntry :
- device2MeasurementLastEntry.getValue().entrySet()) {
- final TimeValuePair tvPair = measurementEntry.getValue();
- if (tvPair != DeviceLastCache.EMPTY_TIME_VALUE_PAIR) {
- valueList.add(tvPair.getValue().getStringValue());
- dataTypeList.add(tvPair.getValue().getDataType().name());
- targetDataSet.addTimestampsItem(tvPair.getTimestamp());
- timeseries.add(deviceWithSeparator + measurementEntry.getKey());
- }
+ final String deviceWithSeparator =
+ device2MeasurementLastEntry.getKey() +
TsFileConstant.PATH_SEPARATOR;
+ for (Map.Entry<String, TimeValuePair> measurementEntry :
+ device2MeasurementLastEntry.getValue().entrySet()) {
+ final TimeValuePair tvPair = measurementEntry.getValue();
+ if (tvPair != DeviceLastCache.EMPTY_TIME_VALUE_PAIR) {
+ valueList.add(tvPair.getValue().getStringValue());
+ dataTypeList.add(tvPair.getValue().getDataType().name());
+ targetDataSet.addTimestampsItem(tvPair.getTimestamp());
+ timeseries.add(deviceWithSeparator + measurementEntry.getKey());
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 3e1d68ac3f3..93c15f80390 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -818,7 +818,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
"The \"executeFastLastDataQueryForOnePrefixPath\" dos not
support wildcards."));
}
- final Map<String, Map<PartialPath, Map<String, TimeValuePair>>>
resultMap = new HashMap<>();
+ final Map<PartialPath, Map<String, TimeValuePair>> resultMap = new
HashMap<>();
int sensorNum = 0;
final String prefixString = prefixPath.toString();
@@ -839,25 +839,22 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// 2.2 all sensors hit cache, return response ~= 20ms
final TsBlockBuilder builder =
LastQueryUtil.createTsBlockBuilder(sensorNum);
- for (final Map.Entry<String, Map<PartialPath, Map<String,
TimeValuePair>>> result :
+ for (final Map.Entry<PartialPath, Map<String, TimeValuePair>>
device2MeasurementLastEntry :
resultMap.entrySet()) {
- for (final Map.Entry<PartialPath, Map<String, TimeValuePair>>
device2MeasurementLastEntry :
- result.getValue().entrySet()) {
- final String deviceWithSeparator =
- device2MeasurementLastEntry.getKey() +
TsFileConstant.PATH_SEPARATOR;
- for (final Map.Entry<String, TimeValuePair> measurementLastEntry :
- device2MeasurementLastEntry.getValue().entrySet()) {
- final TimeValuePair tvPair = measurementLastEntry.getValue();
- if (tvPair != DeviceLastCache.EMPTY_TIME_VALUE_PAIR) {
- LastQueryUtil.appendLastValue(
- builder,
- tvPair.getTimestamp(),
- new Binary(
- deviceWithSeparator + measurementLastEntry.getKey(),
- TSFileConfig.STRING_CHARSET),
- tvPair.getValue().getStringValue(),
- tvPair.getValue().getDataType().name());
- }
+ final String deviceWithSeparator =
+ device2MeasurementLastEntry.getKey() +
TsFileConstant.PATH_SEPARATOR;
+ for (final Map.Entry<String, TimeValuePair> measurementLastEntry :
+ device2MeasurementLastEntry.getValue().entrySet()) {
+ final TimeValuePair tvPair = measurementLastEntry.getValue();
+ if (tvPair != DeviceLastCache.EMPTY_TIME_VALUE_PAIR) {
+ LastQueryUtil.appendLastValue(
+ builder,
+ tvPair.getTimestamp(),
+ new Binary(
+ deviceWithSeparator + measurementLastEntry.getKey(),
+ TSFileConfig.STRING_CHARSET),
+ tvPair.getValue().getStringValue(),
+ tvPair.getValue().getDataType().name());
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 7d25b5c0d35..e265994a370 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -537,7 +537,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus invalidateLastCache(final String database) {
- DataNodeSchemaCache.getInstance().invalidateDatabaseLastCache(database);
+
DataNodeSchemaCache.getInstance().getDeviceSchemaCache().invalidateLastCache();
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@@ -546,7 +546,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
DataNodeSchemaCache.getInstance().takeWriteLock();
try {
// req.getFullPath() is a database path
-
DataNodeSchemaCache.getInstance().getDeviceSchemaCache().invalidate(req.getFullPath());
+ DataNodeSchemaCache.getInstance().getDeviceSchemaCache().invalidateAll();
ClusterTemplateManager.getInstance().invalid(req.getFullPath());
LOGGER.info("Schema cache of {} has been invalidated",
req.getFullPath());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
index a015f6ebb50..af39cef95e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
@@ -306,13 +306,6 @@ public class DataNodeSchemaCache {
deviceSchemaCache.invalidateLastCache(path.getDevicePath(),
path.getMeasurement());
}
- public void invalidateDatabaseLastCache(final String database) {
- if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
- return;
- }
- deviceSchemaCache.invalidateLastCache(database);
- }
-
/**
* Update the {@link DeviceLastCache} in writing. If a measurement is with
all {@code null}s, its
* {@link TimeValuePair}[] shall be {@code null}. For correctness, this will
put the {@link
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java
index f13e2030cf9..ecbb2d3aea2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java
@@ -19,21 +19,18 @@
package org.apache.iotdb.db.queryengine.plan.analyze.cache.schema;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternUtil;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
-import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.IDualKeyCache;
-import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl.DualKeyCacheBuilder;
-import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl.DualKeyCachePolicy;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.ICache;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl.CacheBuilder;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl.CachePolicy;
-import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.TimeValuePair;
-import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +43,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Predicate;
import java.util.function.ToIntFunction;
/**
@@ -69,7 +65,7 @@ public class DeviceSchemaCache {
*
* <p>2. Optimize the speed in invalidation by databases for most scenarios.
*/
- private final IDualKeyCache<String, PartialPath, DeviceCacheEntry>
dualKeyCache;
+ private final ICache<PartialPath, DeviceCacheEntry> dualKeyCache;
private final Map<String, String> databasePool = new ConcurrentHashMap<>();
@@ -77,11 +73,9 @@ public class DeviceSchemaCache {
DeviceSchemaCache() {
dualKeyCache =
- new DualKeyCacheBuilder<String, PartialPath, DeviceCacheEntry>()
- .cacheEvictionPolicy(
-
DualKeyCachePolicy.valueOf(config.getDataNodeSchemaCacheEvictionPolicy()))
+ new CacheBuilder<PartialPath, DeviceCacheEntry>()
+
.cacheEvictionPolicy(CachePolicy.valueOf(config.getDataNodeSchemaCacheEvictionPolicy()))
.memoryCapacity(config.getAllocateMemoryForSchemaCache())
- .firstKeySizeComputer(segment -> (int)
RamUsageEstimator.sizeOf(segment))
.secondKeySizeComputer(PartialPath::estimateSize)
.valueSizeComputer(DeviceCacheEntry::estimateSize)
.build();
@@ -99,7 +93,7 @@ public class DeviceSchemaCache {
* hit but result is {@code null}, and the result value otherwise.
*/
public TimeValuePair getLastEntry(final PartialPath device, final String
measurement) {
- final DeviceCacheEntry entry = dualKeyCache.get(getLeadingSegment(device),
device);
+ final DeviceCacheEntry entry = dualKeyCache.get(device);
return Objects.nonNull(entry) ? entry.getTimeValuePair(measurement) : null;
}
@@ -109,8 +103,7 @@ public class DeviceSchemaCache {
* @param device IDeviceID
*/
public void invalidateDeviceLastCache(final PartialPath device) {
- dualKeyCache.update(
- getLeadingSegment(device), device, null, entry ->
-entry.invalidateLastCache(), false);
+ dualKeyCache.update(device, null, entry -> -entry.invalidateLastCache(),
false);
}
public void putDeviceSchema(final String database, final DeviceSchemaInfo
deviceSchemaInfo) {
@@ -118,7 +111,6 @@ public class DeviceSchemaCache {
final String databaseToUse = databasePool.computeIfAbsent(database, k ->
database);
dualKeyCache.update(
- getLeadingSegment(devicePath),
devicePath,
new DeviceCacheEntry(),
entry -> entry.setDeviceSchema(databaseToUse, deviceSchemaInfo),
@@ -126,7 +118,7 @@ public class DeviceSchemaCache {
}
public IDeviceSchema getDeviceSchema(final PartialPath device) {
- final DeviceCacheEntry entry = dualKeyCache.get(getLeadingSegment(device),
device);
+ final DeviceCacheEntry entry = dualKeyCache.get(device);
return Objects.nonNull(entry) ? entry.getDeviceSchema() : null;
}
@@ -141,7 +133,6 @@ public class DeviceSchemaCache {
final String database2Use = databasePool.computeIfAbsent(database, k ->
database);
dualKeyCache.update(
- getLeadingSegment(device),
device,
new DeviceCacheEntry(),
initOrInvalidate
@@ -156,8 +147,7 @@ public class DeviceSchemaCache {
Objects.isNull(timeValuePairs));
}
- public boolean getLastCache(
- final Map<String, Map<PartialPath, Map<String, TimeValuePair>>>
inputMap) {
+ public boolean getLastCache(final Map<PartialPath, Map<String,
TimeValuePair>> inputMap) {
return dualKeyCache.batchApply(inputMap, DeviceCacheEntry::updateInputMap);
}
@@ -168,50 +158,27 @@ public class DeviceSchemaCache {
: entry -> -entry.invalidateLastCache(measurement);
if (!devicePath.hasWildcard()) {
- dualKeyCache.update(getLeadingSegment(devicePath), devicePath, null,
updateFunction, false);
+ dualKeyCache.update(devicePath, null, updateFunction, false);
} else {
// This may take quite a long time to perform, yet we assume that the
"invalidateLastCache" is
// only called by deletions, which has a low frequency; and it has
avoided that
// the un-related paths being cleared, like "root.*.b.c.**" affects
// "root.*.d.c.**", thereby lower the query performance.
dualKeyCache.update(
- segment -> {
- try {
- return devicePath.matchPrefixPath(new PartialPath(segment));
- } catch (final IllegalPathException e) {
- logger.warn(
- "Illegal segmentID {} found in cache when invalidating by
path {}, invalidate it anyway",
- segment,
- devicePath);
- return true;
- }
- },
- cachedDeviceID -> cachedDeviceID.matchFullPath(devicePath),
- updateFunction);
+ cachedDeviceID -> cachedDeviceID.matchFullPath(devicePath),
updateFunction);
}
}
void invalidateCache(
final @Nonnull PartialPath devicePath, final boolean
isMultiLevelWildcardMeasurement) {
if (!devicePath.hasWildcard()) {
- dualKeyCache.invalidate(getLeadingSegment(devicePath), devicePath);
+ dualKeyCache.invalidate(devicePath);
} else {
// This may take quite a long time to perform, yet we assume that the
"invalidateLastCache" is
// only called by deletions, which has a low frequency; and it has
avoided that
// the un-related paths being cleared, like "root.*.b.c.**" affects
// "root.*.d.c.**", thereby lower the query performance.
dualKeyCache.invalidate(
- segment -> {
- try {
- return devicePath.matchPrefixPath(new PartialPath(segment));
- } catch (final IllegalPathException e) {
- logger.warn(
- "Illegal segmentID {} found in cache when invalidating by
path {}, invalidate it anyway",
- segment,
- devicePath);
- return true;
- }
- },
cachedDeviceID ->
isMultiLevelWildcardMeasurement
? devicePath.matchPrefixPath(cachedDeviceID)
@@ -241,44 +208,10 @@ public class DeviceSchemaCache {
return dualKeyCache.stats().entriesCount();
}
- // Note: It might be very slow if the database is too long
- void invalidateLastCache(final @Nonnull String database) {
- Predicate<PartialPath> secondKeyChecker;
- try {
- final PartialPath databasePath = new PartialPath(database);
- secondKeyChecker = device -> device.matchPrefixPath(databasePath);
- } catch (final Exception ignored) {
- secondKeyChecker = device -> device.startsWith(database);
- }
-
- lock.lock();
- try {
- dualKeyCache.update(
- segment -> segment.startsWith(database),
- device -> true,
- entry -> -entry.invalidateLastCache());
- dualKeyCache.update(
- database::startsWith, secondKeyChecker, entry ->
-entry.invalidateLastCache());
- } finally {
- lock.unlock();
- }
- }
-
- // Note: It might be very slow if the database is too long
- public void invalidate(final @Nonnull String database) {
- lock.lock();
- try {
- dualKeyCache.invalidate(segment -> segment.startsWith(database), device
-> true);
- dualKeyCache.invalidate(database::startsWith, device ->
device.startsWith(database));
- } finally {
- lock.unlock();
- }
- }
-
public void invalidateLastCache() {
lock.lock();
try {
- dualKeyCache.update(segment -> true, device -> true, entry ->
-entry.invalidateLastCache());
+ dualKeyCache.update(device -> true, entry ->
-entry.invalidateLastCache());
} finally {
lock.unlock();
}
@@ -287,7 +220,7 @@ public class DeviceSchemaCache {
public void invalidateSchema() {
lock.lock();
try {
- dualKeyCache.update(segment -> true, device -> true, entry ->
-entry.invalidateSchema());
+ dualKeyCache.update(device -> true, entry -> -entry.invalidateSchema());
} finally {
lock.unlock();
}
@@ -301,30 +234,4 @@ public class DeviceSchemaCache {
lock.unlock();
}
}
-
- // Utils of leading segment
-
- public static String getLeadingSegment(final PartialPath device) {
- final String segment;
- int lastSeparatorPos = -1;
- int separatorNum = 0;
-
- final String deviceStr = device.getFullPath();
- for (int i = 0; i < deviceStr.length(); i++) {
- if (deviceStr.charAt(i) == TsFileConstant.PATH_SEPARATOR_CHAR) {
- lastSeparatorPos = i;
- separatorNum++;
- if (separatorNum == 3) {
- break;
- }
- }
- }
- if (lastSeparatorPos == -1) {
- segment = deviceStr;
- } else {
- segment = deviceStr.substring(0, lastSeparatorPos);
- }
-
- return segment;
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/ICache.java
similarity index 65%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/ICache.java
index 1a2d788c9f7..e5eee3d4ab7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/ICache.java
@@ -34,13 +34,13 @@ import java.util.function.ToIntFunction;
* @param <SK> The second key of cache value
* @param <V> The cache value
*/
-public interface IDualKeyCache<FK, SK, V> {
+public interface ICache<SK, V> {
/** Get the cache value with given first key and second key. */
- V get(final FK firstKey, final SK secondKey);
+ V get(final SK secondKey);
<R> boolean batchApply(
- final Map<FK, Map<SK, R>> inputMap, final BiFunction<V, R, Boolean>
mappingFunction);
+ final Map<SK, R> inputMap, final BiFunction<V, R, Boolean>
mappingFunction);
/**
* Update the existing value. The updater shall return the difference caused
by the update,
@@ -53,11 +53,7 @@ public interface IDualKeyCache<FK, SK, V> {
* @param createIfNotExists put the value to cache iff it does not exist,
*/
void update(
- final FK firstKey,
- final SK secondKey,
- final V value,
- final ToIntFunction<V> updater,
- final boolean createIfNotExists);
+ final SK key, final V value, final ToIntFunction<V> updater, final
boolean createIfNotExists);
/**
* Update all the existing value with {@link SK} and a the {@link SK}s
matching the given
@@ -67,21 +63,7 @@ public interface IDualKeyCache<FK, SK, V> {
* <p>Warning: This method is without any locks for performance concerns.
The caller shall ensure
* the concurrency safety for the value update.
*/
- void update(
- final FK firstKey, final Predicate<SK> secondKeyChecker, final
ToIntFunction<V> updater);
-
- /**
- * Update all the existing value with {@link SK} and a the {@link SK}s
matching the given
- * predicate. The updater shall return the difference caused by the update,
because we do not want
- * to call "valueSizeComputer" twice, which may include abundant useless
calculations.
- *
- * <p>Warning: This method is without any locks for performance concerns.
The caller shall ensure
- * the concurrency safety for the value update.
- */
- void update(
- final Predicate<FK> firstKeyChecker,
- final Predicate<SK> secondKeyChecker,
- final ToIntFunction<V> updater);
+ void update(final Predicate<SK> keyChecker, final ToIntFunction<V> updater);
/**
* Invalidate all cache values in the cache and clear related cache keys.
The cache status and
@@ -91,20 +73,12 @@ public interface IDualKeyCache<FK, SK, V> {
void invalidateAll();
/** Return all the current cache status and statistics. */
- IDualKeyCacheStats stats();
-
- /** remove all entries for firstKey */
- @GuardedBy("DataNodeSchemaCache#writeLock")
- void invalidate(final FK firstKey);
+ ICacheStats stats();
/** remove matched entry */
@GuardedBy("DataNodeSchemaCache#writeLock")
- void invalidate(final FK firstKey, final SK secondKey);
-
- @GuardedBy("DataNodeSchemaCache#writeLock")
- void invalidate(final FK firstKey, final Predicate<SK> secondKeyChecker);
+ void invalidate(final SK secondKey);
- /** remove all entries matching the firstKey and the secondKey */
@GuardedBy("DataNodeSchemaCache#writeLock")
- void invalidate(final Predicate<FK> firstKeyChecker, final Predicate<SK>
secondKeyChecker);
+ void invalidate(final Predicate<SK> keyChecker);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCacheStats.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/ICacheStats.java
similarity index 97%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCacheStats.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/ICacheStats.java
index 9247e20a9c4..caeccc1a3a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCacheStats.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/ICacheStats.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache;
/** This interface defines the status and statistics, that will be provided ,
of dual key cache. */
-public interface IDualKeyCacheStats {
+public interface ICacheStats {
/**
* Return the count of recorded requests, since the cache has been utilized
after init or clean
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCacheComputation.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCacheComputation.java
deleted file mode 100644
index f0aa50f769b..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCacheComputation.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache;
-
-/**
- * This interfaces defines the behaviour needed be implemented and executed
during cache value
- * traverse.
- *
- * @param <FK> The first key of target cache values
- * @param <SK> The second key of one target cache value
- * @param <V> The cache value
- */
-public interface IDualKeyCacheComputation<FK, SK, V> {
-
- /** Return the first key of target cache values. */
- FK getFirstKey();
-
- /** Return the second key list of target cache values. */
- SK[] getSecondKeyList();
-
- /**
- * Compute each target cache value. The index is the second key's position
in second key list. The
- * value here is read only.
- */
- void computeValue(int index, V value);
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheBuilder.java
similarity index 68%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheBuilder.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheBuilder.java
index 6192ac1c1f1..2f324c7c687 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheBuilder.java
@@ -19,7 +19,7 @@
package
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl;
-import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.IDualKeyCache;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.ICache;
import java.util.function.Function;
@@ -30,21 +30,19 @@ import java.util.function.Function;
* @param <SK> The second key of cache value
* @param <V> The cache value
*/
-public class DualKeyCacheBuilder<FK, SK, V> {
+public class CacheBuilder<SK, V> {
- private DualKeyCachePolicy policy;
+ private CachePolicy policy;
private long memoryCapacity;
- private Function<FK, Integer> firstKeySizeComputer;
-
private Function<SK, Integer> secondKeySizeComputer;
private Function<V, Integer> valueSizeComputer;
/** Initiate and return a dual key cache instance. */
- public IDualKeyCache<FK, SK, V> build() {
- ICacheEntryManager<FK, SK, V, ?> cacheEntryManager = null;
+ public ICache<SK, V> build() {
+ ICacheEntryManager<SK, V, ?> cacheEntryManager = null;
switch (policy) {
case LRU:
cacheEntryManager = new LRUCacheEntryManager<>();
@@ -53,38 +51,32 @@ public class DualKeyCacheBuilder<FK, SK, V> {
cacheEntryManager = new FIFOCacheEntryManager<>();
break;
}
- return new DualKeyCacheImpl<>(
+ return new CacheImpl<>(
cacheEntryManager,
- new CacheSizeComputerImpl<>(firstKeySizeComputer,
secondKeySizeComputer, valueSizeComputer),
+ new CacheSizeComputerImpl<>(secondKeySizeComputer, valueSizeComputer),
memoryCapacity);
}
/** Define the cache eviction policy of dual key cache. */
- public DualKeyCacheBuilder<FK, SK, V> cacheEvictionPolicy(DualKeyCachePolicy
policy) {
+ public CacheBuilder<SK, V> cacheEvictionPolicy(CachePolicy policy) {
this.policy = policy;
return this;
}
/** Define the memory capacity of dual key cache. */
- public DualKeyCacheBuilder<FK, SK, V> memoryCapacity(long memoryCapacity) {
+ public CacheBuilder<SK, V> memoryCapacity(long memoryCapacity) {
this.memoryCapacity = memoryCapacity;
return this;
}
- /** Define how to compute the memory usage of a first key in dual key cache.
*/
- public DualKeyCacheBuilder<FK, SK, V> firstKeySizeComputer(Function<FK,
Integer> computer) {
- this.firstKeySizeComputer = computer;
- return this;
- }
-
/** Define how to compute the memory usage of a second key in dual key
cache. */
- public DualKeyCacheBuilder<FK, SK, V> secondKeySizeComputer(Function<SK,
Integer> computer) {
+ public CacheBuilder<SK, V> secondKeySizeComputer(Function<SK, Integer>
computer) {
this.secondKeySizeComputer = computer;
return this;
}
/** Define how to compute the memory usage of a cache value in dual key
cache. */
- public DualKeyCacheBuilder<FK, SK, V> valueSizeComputer(Function<V, Integer>
computer) {
+ public CacheBuilder<SK, V> valueSizeComputer(Function<V, Integer> computer) {
this.valueSizeComputer = computer;
return this;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
index ea7f502a90b..19123bb4b53 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
@@ -29,8 +29,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
-public class CacheEntryGroupImpl<FK, SK, V, T extends ICacheEntry<SK, V>>
- implements ICacheEntryGroup<FK, SK, V, T> {
+public class CacheEntryGroupImpl<SK, V, T extends ICacheEntry<SK, V>>
+ implements ICacheEntryGroup<SK, V, T> {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(CacheEntryGroupImpl.class)
@@ -39,21 +39,13 @@ public class CacheEntryGroupImpl<FK, SK, V, T extends
ICacheEntry<SK, V>>
// Calculate the outer entry of the "firstKeyMap" here
+ RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
- private final FK firstKey;
-
private final Map<SK, T> cacheEntryMap = new ConcurrentHashMap<>();
- private final ICacheSizeComputer<FK, SK, V> sizeComputer;
+ private final ICacheSizeComputer<SK, V> sizeComputer;
private final AtomicLong memory;
- CacheEntryGroupImpl(final FK firstKey, final ICacheSizeComputer<FK, SK, V>
sizeComputer) {
- this.firstKey = firstKey;
+ CacheEntryGroupImpl(final ICacheSizeComputer<SK, V> sizeComputer) {
this.sizeComputer = sizeComputer;
- this.memory = new AtomicLong(INSTANCE_SIZE +
sizeComputer.computeFirstKeySize(firstKey));
- }
-
- @Override
- public FK getFirstKey() {
- return firstKey;
+ this.memory = new AtomicLong(INSTANCE_SIZE);
}
@Override
@@ -115,12 +107,12 @@ public class CacheEntryGroupImpl<FK, SK, V, T extends
ICacheEntry<SK, V>>
if (o == null || getClass() != o.getClass()) {
return false;
}
- final CacheEntryGroupImpl<?, ?, ?, ?> that = (CacheEntryGroupImpl<?, ?, ?,
?>) o;
- return Objects.equals(firstKey, that.firstKey);
+ final CacheEntryGroupImpl<?, ?, ?> that = (CacheEntryGroupImpl<?, ?, ?>) o;
+ return Objects.equals(cacheEntryMap, that.cacheEntryMap);
}
@Override
public int hashCode() {
- return firstKey.hashCode();
+ return cacheEntryMap.hashCode();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheImpl.java
new file mode 100644
index 00000000000..16304d94d6b
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheImpl.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl;
+
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.ICache;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.ICacheStats;
+
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import javax.annotation.Nonnull;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+import java.util.function.ToIntFunction;
+
+class CacheImpl<SK, V, T extends ICacheEntry<SK, V>> implements ICache<SK, V> {
+ private final ICacheEntryGroup<SK, V, T> cacheEntryGroup;
+
+ private final ICacheEntryManager<SK, V, T> cacheEntryManager;
+
+ private final ICacheSizeComputer<SK, V> sizeComputer;
+
+ private final CacheStats cacheStats;
+
+ CacheImpl(
+ final ICacheEntryManager<SK, V, T> cacheEntryManager,
+ final ICacheSizeComputer<SK, V> sizeComputer,
+ final long memoryCapacity) {
+ this.cacheEntryManager = cacheEntryManager;
+ this.sizeComputer = sizeComputer;
+ this.cacheStats = new CacheStats(memoryCapacity, this::getMemory,
this::getEntriesCount);
+ this.cacheEntryGroup = new CacheEntryGroupImpl<>(sizeComputer);
+ }
+
+ @Override
+ public V get(final SK secondKey) {
+ final T cacheEntry = cacheEntryGroup.getCacheEntry(secondKey);
+ if (cacheEntry == null) {
+ cacheStats.recordMiss(1);
+ return null;
+ } else {
+ cacheEntryManager.access(cacheEntry);
+ cacheStats.recordHit(1);
+ return cacheEntry.getValue();
+ }
+ }
+
+ @Override
+ public <R> boolean batchApply(
+ final Map<SK, R> inputMap, final BiFunction<V, R, Boolean>
mappingFunction) {
+ for (final Map.Entry<SK, R> skrEntry : inputMap.entrySet()) {
+ final T cacheEntry = cacheEntryGroup.getCacheEntry(skrEntry.getKey());
+ if (cacheEntry == null) {
+ return false;
+ }
+ if (!mappingFunction.apply(cacheEntry.getValue(), skrEntry.getValue())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void update(
+ final @Nonnull SK secondKey,
+ final V value,
+ final ToIntFunction<V> updater,
+ final boolean createIfNotExists) {
+
+ final ICacheEntryGroup<SK, V, T> finalCacheEntryGroup = cacheEntryGroup;
+ cacheEntryGroup.computeCacheEntry(
+ secondKey,
+ memory ->
+ (sk, cacheEntry) -> {
+ if (Objects.isNull(cacheEntry)) {
+ if (!createIfNotExists) {
+ return null;
+ }
+ cacheEntry =
+ cacheEntryManager.createCacheEntry(secondKey, value,
finalCacheEntryGroup);
+ cacheEntryManager.put(cacheEntry);
+ memory.getAndAdd(
+ sizeComputer.computeSecondKeySize(sk)
+ + sizeComputer.computeValueSize(cacheEntry.getValue())
+ + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
+ }
+ memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+ return cacheEntry;
+ });
+
+ mayEvict();
+ }
+
+ @Override
+ public void update(final Predicate<SK> secondKeyChecker, final
ToIntFunction<V> updater) {
+ clearSecondEntry(cacheEntryGroup, secondKeyChecker, updater);
+ mayEvict();
+ }
+
+ public void clearSecondEntry(
+ final ICacheEntryGroup<SK, V, T> entryGroup,
+ final Predicate<SK> secondKeyChecker,
+ final ToIntFunction<V> updater) {
+ if (Objects.nonNull(entryGroup)) {
+ entryGroup
+ .getAllCacheEntries()
+ .forEachRemaining(
+ entry -> {
+ if (!secondKeyChecker.test(entry.getKey())) {
+ return;
+ }
+ entryGroup.computeCacheEntryIfPresent(
+ entry.getKey(),
+ memory ->
+ (secondKey, cacheEntry) -> {
+
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+ return cacheEntry;
+ });
+ });
+ }
+ }
+
+ private void mayEvict() {
+ long exceedMemory;
+ while ((exceedMemory = cacheStats.getExceedMemory()) > 0) {
+ do {
+ exceedMemory -= evictOneCacheEntry();
+ } while (exceedMemory > 0);
+ }
+ }
+
+ // The returned delta may have some error, but it's OK
+ // Because the delta is only for loop round estimation
+ private long evictOneCacheEntry() {
+ final ICacheEntry<SK, V> evictCacheEntry = cacheEntryManager.evict();
+ if (evictCacheEntry == null) {
+ return 0;
+ }
+
+ final ICacheEntryGroup<SK, V, T> belongedGroup =
evictCacheEntry.getBelongedGroup();
+ evictCacheEntry.setBelongedGroup(null);
+
+ return belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
+ }
+
+ @Override
+ public void invalidateAll() {
+ cacheEntryManager.cleanUp();
+ }
+
+ @Override
+ public ICacheStats stats() {
+ return cacheStats;
+ }
+
+ @Override
+ public void invalidate(final SK secondKey) {
+
+ final T entry = cacheEntryGroup.getCacheEntry(secondKey);
+ if (Objects.nonNull(entry) && cacheEntryManager.invalidate(entry)) {
+ cacheEntryGroup.removeCacheEntry(entry.getSecondKey());
+ }
+ }
+
+ @Override
+ public void invalidate(final Predicate<SK> secondKeyChecker) {
+ for (final Iterator<Map.Entry<SK, T>> it =
cacheEntryGroup.getAllCacheEntries();
+ it.hasNext(); ) {
+ final Map.Entry<SK, T> entry = it.next();
+ if (secondKeyChecker.test(entry.getKey()) &&
cacheEntryManager.invalidate(entry.getValue())) {
+ cacheEntryGroup.removeCacheEntry(entry.getKey());
+ }
+ }
+ }
+
+ private long getMemory() {
+ return cacheEntryGroup.getMemory();
+ }
+
+ private long getEntriesCount() {
+ return cacheEntryGroup.getEntriesCount();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCachePolicy.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CachePolicy.java
similarity index 96%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCachePolicy.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CachePolicy.java
index d1dcc39af27..5f7c8cfdfcb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCachePolicy.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CachePolicy.java
@@ -19,7 +19,7 @@
package
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl;
-public enum DualKeyCachePolicy {
+public enum CachePolicy {
LRU,
FIFO;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheSizeComputerImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheSizeComputerImpl.java
index 417a5a64431..0b8732623a5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheSizeComputerImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheSizeComputerImpl.java
@@ -21,28 +21,18 @@ package
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.i
import java.util.function.Function;
-class CacheSizeComputerImpl<FK, SK, V> implements ICacheSizeComputer<FK, SK,
V> {
-
- private final Function<FK, Integer> firstKeySizeComputer;
+class CacheSizeComputerImpl<SK, V> implements ICacheSizeComputer<SK, V> {
private final Function<SK, Integer> secondKeySizeComputer;
private final Function<V, Integer> valueSizeComputer;
CacheSizeComputerImpl(
- Function<FK, Integer> firstKeySizeComputer,
- Function<SK, Integer> secondKeySizeComputer,
- Function<V, Integer> valueSizeCompute) {
- this.firstKeySizeComputer = firstKeySizeComputer;
+ Function<SK, Integer> secondKeySizeComputer, Function<V, Integer>
valueSizeCompute) {
this.secondKeySizeComputer = secondKeySizeComputer;
this.valueSizeComputer = valueSizeCompute;
}
- @Override
- public int computeFirstKeySize(FK firstKey) {
- return firstKeySizeComputer.apply(firstKey);
- }
-
@Override
public int computeSecondKeySize(SK secondKey) {
return secondKeySizeComputer.apply(secondKey);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheStats.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheStats.java
index 0628d35db16..9ee968875c1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheStats.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheStats.java
@@ -19,12 +19,12 @@
package
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl;
-import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.IDualKeyCacheStats;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.ICacheStats;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
-class CacheStats implements IDualKeyCacheStats {
+class CacheStats implements ICacheStats {
// prepare some buffer for high load scenarios
private static final double MEMORY_THRESHOLD_RATIO = 0.8;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
deleted file mode 100644
index 1778920a23a..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl;
-
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.IDualKeyCache;
-import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.IDualKeyCacheStats;
-
-import org.apache.tsfile.utils.RamUsageEstimator;
-
-import javax.annotation.Nonnull;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.BiFunction;
-import java.util.function.Predicate;
-import java.util.function.ToIntFunction;
-
-class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK, V>>
- implements IDualKeyCache<FK, SK, V> {
-
- private final SegmentedConcurrentHashMap<FK, ICacheEntryGroup<FK, SK, V, T>>
firstKeyMap =
- new SegmentedConcurrentHashMap<>();
-
- private final ICacheEntryManager<FK, SK, V, T> cacheEntryManager;
-
- private final ICacheSizeComputer<FK, SK, V> sizeComputer;
-
- private final CacheStats cacheStats;
-
- DualKeyCacheImpl(
- final ICacheEntryManager<FK, SK, V, T> cacheEntryManager,
- final ICacheSizeComputer<FK, SK, V> sizeComputer,
- final long memoryCapacity) {
- this.cacheEntryManager = cacheEntryManager;
- this.sizeComputer = sizeComputer;
- this.cacheStats = new CacheStats(memoryCapacity, this::getMemory,
this::getEntriesCount);
- }
-
- @Override
- public V get(final FK firstKey, final SK secondKey) {
- final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(firstKey);
- if (cacheEntryGroup == null) {
- cacheStats.recordMiss(1);
- return null;
- } else {
- final T cacheEntry = cacheEntryGroup.getCacheEntry(secondKey);
- if (cacheEntry == null) {
- cacheStats.recordMiss(1);
- return null;
- } else {
- cacheEntryManager.access(cacheEntry);
- cacheStats.recordHit(1);
- return cacheEntry.getValue();
- }
- }
- }
-
- @Override
- public <R> boolean batchApply(
- final Map<FK, Map<SK, R>> inputMap, final BiFunction<V, R, Boolean>
mappingFunction) {
- for (final Map.Entry<FK, Map<SK, R>> fkMapEntry : inputMap.entrySet()) {
- final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(fkMapEntry.getKey());
- if (cacheEntryGroup == null) {
- return false;
- }
- for (final Map.Entry<SK, R> skrEntry : fkMapEntry.getValue().entrySet())
{
- final T cacheEntry = cacheEntryGroup.getCacheEntry(skrEntry.getKey());
- if (cacheEntry == null) {
- return false;
- }
- if (!mappingFunction.apply(cacheEntry.getValue(),
skrEntry.getValue())) {
- return false;
- }
- }
- }
- return true;
- }
-
- @Override
- public void update(
- final FK firstKey,
- final @Nonnull SK secondKey,
- final V value,
- final ToIntFunction<V> updater,
- final boolean createIfNotExists) {
-
- ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup = firstKeyMap.get(firstKey);
- if (Objects.isNull(cacheEntryGroup)) {
- if (createIfNotExists) {
- cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey, sizeComputer);
- firstKeyMap.put(firstKey, cacheEntryGroup);
- } else {
- return;
- }
- }
-
- final ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
- cacheEntryGroup.computeCacheEntry(
- secondKey,
- memory ->
- (sk, cacheEntry) -> {
- if (Objects.isNull(cacheEntry)) {
- if (!createIfNotExists) {
- return null;
- }
- cacheEntry =
- cacheEntryManager.createCacheEntry(secondKey, value,
finalCacheEntryGroup);
- cacheEntryManager.put(cacheEntry);
- memory.getAndAdd(
- sizeComputer.computeSecondKeySize(sk)
- + sizeComputer.computeValueSize(cacheEntry.getValue())
- + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
- }
- memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
- return cacheEntry;
- });
-
- mayEvict();
- }
-
- @Override
- public void update(
- final FK firstKey, final Predicate<SK> secondKeyChecker, final
ToIntFunction<V> updater) {
- clearSecondEntry(firstKeyMap.get(firstKey), secondKeyChecker, updater);
- mayEvict();
- }
-
- @Override
- public void update(
- final Predicate<FK> firstKeyChecker,
- final Predicate<SK> secondKeyChecker,
- final ToIntFunction<V> updater) {
- for (final FK firstKey : firstKeyMap.getAllKeys()) {
- if (!firstKeyChecker.test(firstKey)) {
- continue;
- }
- clearSecondEntry(firstKeyMap.get(firstKey), secondKeyChecker, updater);
- }
- mayEvict();
- }
-
- public void clearSecondEntry(
- final ICacheEntryGroup<FK, SK, V, T> entryGroup,
- final Predicate<SK> secondKeyChecker,
- final ToIntFunction<V> updater) {
- if (Objects.nonNull(entryGroup)) {
- entryGroup
- .getAllCacheEntries()
- .forEachRemaining(
- entry -> {
- if (!secondKeyChecker.test(entry.getKey())) {
- return;
- }
- entryGroup.computeCacheEntryIfPresent(
- entry.getKey(),
- memory ->
- (secondKey, cacheEntry) -> {
-
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
- return cacheEntry;
- });
- });
- }
- }
-
- private void mayEvict() {
- long exceedMemory;
- final int threshold =
-
IoTDBDescriptor.getInstance().getConfig().getCacheEvictionMemoryComputationThreshold();
- while ((exceedMemory = cacheStats.getExceedMemory()) > 0) {
- // Not compute each time to save time when FK is too many
- do {
- exceedMemory -= evictOneCacheEntry();
- } while (exceedMemory > 0 && firstKeyMap.size() > threshold);
- }
- }
-
- // The returned delta may have some error, but it's OK
- // Because the delta is only for loop round estimation
- private long evictOneCacheEntry() {
- final ICacheEntry<SK, V> evictCacheEntry = cacheEntryManager.evict();
- if (evictCacheEntry == null) {
- return 0;
- }
-
- final ICacheEntryGroup<FK, SK, V, T> belongedGroup =
evictCacheEntry.getBelongedGroup();
- evictCacheEntry.setBelongedGroup(null);
-
- long memory =
belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
-
- final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
- firstKeyMap.get(belongedGroup.getFirstKey());
- if (Objects.nonNull(cacheEntryGroup) && cacheEntryGroup.isEmpty()) {
- // The removal is non-atomic, but it's ok because it's just a cache and
does not affect the
- // consistency if you evicts some entries being added
- if (Objects.nonNull(firstKeyMap.remove(belongedGroup.getFirstKey()))) {
- memory +=
- sizeComputer.computeFirstKeySize(belongedGroup.getFirstKey())
- + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
- }
- }
- return memory;
- }
-
- @Override
- public void invalidateAll() {
- firstKeyMap.clear();
- cacheEntryManager.cleanUp();
- }
-
- @Override
- public IDualKeyCacheStats stats() {
- return cacheStats;
- }
-
- @Override
- public void invalidate(final FK firstKey) {
- final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.remove(firstKey);
- if (cacheEntryGroup != null) {
- for (final Iterator<Map.Entry<SK, T>> it =
cacheEntryGroup.getAllCacheEntries();
- it.hasNext(); ) {
- cacheEntryManager.invalidate(it.next().getValue());
- }
- }
- }
-
- @Override
- public void invalidate(final FK firstKey, final SK secondKey) {
- final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(firstKey);
- if (Objects.isNull(cacheEntryGroup)) {
- return;
- }
-
- final T entry = cacheEntryGroup.getCacheEntry(secondKey);
- if (Objects.nonNull(entry) && cacheEntryManager.invalidate(entry)) {
- cacheEntryGroup.removeCacheEntry(entry.getSecondKey());
- }
-
- if (cacheEntryGroup.isEmpty()) {
- firstKeyMap.remove(firstKey);
- }
- }
-
- @Override
- public void invalidate(final FK firstKey, final Predicate<SK>
secondKeyChecker) {
- final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(firstKey);
- if (Objects.isNull(cacheEntryGroup)) {
- return;
- }
- for (final Iterator<Map.Entry<SK, T>> it =
cacheEntryGroup.getAllCacheEntries();
- it.hasNext(); ) {
- final Map.Entry<SK, T> entry = it.next();
- if (secondKeyChecker.test(entry.getKey()) &&
cacheEntryManager.invalidate(entry.getValue())) {
- cacheEntryGroup.removeCacheEntry(entry.getKey());
- }
- }
-
- if (cacheEntryGroup.isEmpty()) {
- firstKeyMap.remove(firstKey);
- }
- }
-
- @Override
- public void invalidate(
- final Predicate<FK> firstKeyChecker, final Predicate<SK>
secondKeyChecker) {
- for (final FK firstKey : firstKeyMap.getAllKeys()) {
- if (!firstKeyChecker.test(firstKey)) {
- continue;
- }
-
- final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(firstKey);
- if (Objects.isNull(cacheEntryGroup)) {
- return;
- }
-
- for (final Iterator<Map.Entry<SK, T>> it =
cacheEntryGroup.getAllCacheEntries();
- it.hasNext(); ) {
- final Map.Entry<SK, T> entry = it.next();
-
- if (secondKeyChecker.test(entry.getKey())
- && cacheEntryManager.invalidate(entry.getValue())) {
- cacheEntryGroup.removeCacheEntry(entry.getKey());
- }
- }
-
- if (cacheEntryGroup.isEmpty()) {
- firstKeyMap.remove(firstKey);
- }
- }
- }
-
- private long getMemory() {
- long memory = 0;
- for (final Map<FK, ICacheEntryGroup<FK, SK, V, T>> map : firstKeyMap.maps)
{
- if (Objects.nonNull(map)) {
- for (final ICacheEntryGroup<FK, SK, V, T> group : map.values()) {
- memory += group.getMemory();
- }
- }
- }
- return memory;
- }
-
- private long getEntriesCount() {
- long count = 0;
- for (final Map<FK, ICacheEntryGroup<FK, SK, V, T>> map : firstKeyMap.maps)
{
- if (Objects.nonNull(map)) {
- for (final ICacheEntryGroup<FK, SK, V, T> group : map.values()) {
- count += group.getEntriesCount();
- }
- }
- }
- return count;
- }
-
- /**
- * Since the capacity of one instance of ConcurrentHashMap is about 4
million, a number of
- * instances are united for more capacity.
- */
- private static class SegmentedConcurrentHashMap<K, V> {
-
- private static final int SLOT_NUM = 31;
-
- private final Map<K, V>[] maps = new ConcurrentHashMap[SLOT_NUM];
-
- V get(final K key) {
- return getBelongedMap(key).get(key);
- }
-
- V remove(final K key) {
- return getBelongedMap(key).remove(key);
- }
-
- V put(final K key, final V value) {
- return getBelongedMap(key).put(key, value);
- }
-
- void clear() {
- synchronized (maps) {
- Arrays.fill(maps, null);
- }
- }
-
- Map<K, V> getBelongedMap(final K key) {
- int slotIndex = key.hashCode() % SLOT_NUM;
- slotIndex = slotIndex < 0 ? slotIndex + SLOT_NUM : slotIndex;
- Map<K, V> map = maps[slotIndex];
- if (map == null) {
- synchronized (maps) {
- map = maps[slotIndex];
- if (map == null) {
- map = new ConcurrentHashMap<>();
- maps[slotIndex] = map;
- }
- }
- }
- return map;
- }
-
- // Copied list, deletion-safe
- List<K> getAllKeys() {
- final List<K> res = new ArrayList<>();
- Arrays.stream(maps)
- .iterator()
- .forEachRemaining(
- map -> {
- if (map != null) {
- res.addAll(map.keySet());
- }
- });
- return res;
- }
-
- int size() {
- int size = 0;
- for (final Map<K, V> map : maps) {
- if (Objects.nonNull(map)) {
- size += map.size();
- }
- }
- return size;
- }
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java
index c8dc64e01f3..b718df94d4b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java
@@ -23,8 +23,8 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-public class FIFOCacheEntryManager<FK, SK, V>
- implements ICacheEntryManager<FK, SK, V,
FIFOCacheEntryManager.FIFOCacheEntry<SK, V>> {
+public class FIFOCacheEntryManager<SK, V>
+ implements ICacheEntryManager<SK, V,
FIFOCacheEntryManager.FIFOCacheEntry<SK, V>> {
private static final int SLOT_NUM = 128;
@@ -38,7 +38,7 @@ public class FIFOCacheEntryManager<FK, SK, V>
public FIFOCacheEntry<SK, V> createCacheEntry(
final SK secondKey,
final V value,
- final ICacheEntryGroup<FK, SK, V, FIFOCacheEntry<SK, V>>
cacheEntryGroup) {
+ final ICacheEntryGroup<SK, V, FIFOCacheEntry<SK, V>> cacheEntryGroup) {
return new FIFOCacheEntry<>(secondKey, value, cacheEntryGroup);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
index 0791fadc325..4538cd6ad2b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
@@ -34,10 +34,7 @@ import java.util.function.Function;
* @param <V> The cache value.
* @param <T> The cache entry holding cache value.
*/
-interface ICacheEntryGroup<FK, SK, V, T extends ICacheEntry<SK, V>> {
-
- FK getFirstKey();
-
+interface ICacheEntryGroup<SK, V, T extends ICacheEntry<SK, V>> {
T getCacheEntry(final SK secondKey);
Iterator<Map.Entry<SK, T>> getAllCacheEntries();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryManager.java
index 0117f836ba9..c9e81116768 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryManager.java
@@ -28,10 +28,10 @@ package
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.i
* @param <V> The cache value.
* @param <T> The cache entry holding cache value.
*/
-interface ICacheEntryManager<FK, SK, V, T extends ICacheEntry<SK, V>> {
+interface ICacheEntryManager<SK, V, T extends ICacheEntry<SK, V>> {
T createCacheEntry(
- final SK secondKey, final V value, final ICacheEntryGroup<FK, SK, V, T>
cacheEntryGroup);
+ final SK secondKey, final V value, final ICacheEntryGroup<SK, V, T>
cacheEntryGroup);
void access(final T cacheEntry);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheSizeComputer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheSizeComputer.java
index 86434c2558a..13f15b385c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheSizeComputer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheSizeComputer.java
@@ -19,9 +19,7 @@
package
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl;
-interface ICacheSizeComputer<FK, SK, V> {
-
- int computeFirstKeySize(FK firstKey);
+interface ICacheSizeComputer<SK, V> {
int computeSecondKeySize(SK secondKey);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java
index 52d2160d6f7..38c0465b9f1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java
@@ -26,12 +26,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class implements the cache entry manager with LRU policy.
*
- * @param <FK> The first key of cache value.
* @param <SK> The second key of cache value.
* @param <V> The cache value.
*/
-class LRUCacheEntryManager<FK, SK, V>
- implements ICacheEntryManager<FK, SK, V,
LRUCacheEntryManager.LRUCacheEntry<SK, V>> {
+class LRUCacheEntryManager<SK, V>
+ implements ICacheEntryManager<SK, V,
LRUCacheEntryManager.LRUCacheEntry<SK, V>> {
private static final int SLOT_NUM = 128;
@@ -43,7 +42,7 @@ class LRUCacheEntryManager<FK, SK, V>
public LRUCacheEntry<SK, V> createCacheEntry(
final SK secondKey,
final V value,
- final ICacheEntryGroup<FK, SK, V, LRUCacheEntry<SK, V>> cacheEntryGroup)
{
+ final ICacheEntryGroup<SK, V, LRUCacheEntry<SK, V>> cacheEntryGroup) {
return new LRUCacheEntry<>(secondKey, value, cacheEntryGroup);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
index 7afca7bcc58..df69657f1d1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
@@ -317,8 +317,7 @@ public interface ISchemaRegion {
throws MetadataException;
int fillLastQueryMap(
- final PartialPath pattern,
- final Map<String, Map<PartialPath, Map<String, TimeValuePair>>>
mapToFill)
+ final PartialPath pattern, final Map<PartialPath, Map<String,
TimeValuePair>> mapToFill)
throws MetadataException;
// endregion
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index 5449d3c48bc..7f8074c61ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -1335,8 +1335,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
@Override
public int fillLastQueryMap(
- final PartialPath pattern,
- final Map<String, Map<PartialPath, Map<String, TimeValuePair>>>
mapToFill)
+ final PartialPath pattern, final Map<PartialPath, Map<String,
TimeValuePair>> mapToFill)
throws MetadataException {
return mtree.fillLastQueryMap(pattern, mapToFill);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index 045d39400c7..fd8d7c5f1b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -1440,8 +1440,7 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
@Override
public int fillLastQueryMap(
- final PartialPath pattern,
- final Map<String, Map<PartialPath, Map<String, TimeValuePair>>>
mapToFill) {
+ final PartialPath pattern, final Map<PartialPath, Map<String,
TimeValuePair>> mapToFill) {
throw new UnsupportedOperationException("Not implemented");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
index fd281f192a5..e82e39736bc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
@@ -42,7 +42,6 @@ import
org.apache.iotdb.db.exception.metadata.template.DifferentTemplateExceptio
import
org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
-import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DeviceSchemaCache;
import org.apache.iotdb.db.schemaengine.metric.SchemaRegionMemMetric;
import org.apache.iotdb.db.schemaengine.rescon.MemSchemaRegionStatistics;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.IMemMNode;
@@ -1074,8 +1073,7 @@ public class MTreeBelowSGMemoryImpl {
}
public int fillLastQueryMap(
- final PartialPath prefixPath,
- final Map<String, Map<PartialPath, Map<String, TimeValuePair>>>
mapToFill)
+ final PartialPath prefixPath, final Map<PartialPath, Map<String,
TimeValuePair>> mapToFill)
throws MetadataException {
final int[] sensorNum = {0};
try (final EntityUpdater<IMemMNode> updater =
@@ -1091,9 +1089,7 @@ public class MTreeBelowSGMemoryImpl {
}
}
final PartialPath path = node.getPartialPath();
- mapToFill
- .computeIfAbsent(DeviceSchemaCache.getLeadingSegment(path), o
-> new HashMap<>())
- .put(path, measurementMap);
+ mapToFill.put(path, measurementMap);
sensorNum[0] += measurementMap.size();
}
}) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 068990cb6f9..1856a5448b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2432,7 +2432,7 @@ public class DataRegion implements IDataRegionForQuery {
if (deleted) {
return;
}
-
DataNodeSchemaCache.getInstance().invalidateDatabaseLastCache(getDatabaseName());
+
DataNodeSchemaCache.getInstance().getDeviceSchemaCache().invalidateLastCache();
// write log to impacted working TsFileProcessors
List<WALFlushListener> walListeners =
logDeletionInWAL(startTime, endTime, searchIndex, pathToDelete);