This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d6135299cb [IOTDB-5474] Refactor DataNodeSchemaCache Structure (#9050)
d6135299cb is described below
commit d6135299cb3321d4b3db535f0a6a19198a5f9083
Author: Marcos_Zyk <[email protected]>
AuthorDate: Thu Feb 16 14:32:29 2023 +0800
[IOTDB-5474] Refactor DataNodeSchemaCache Structure (#9050)
---
.../iotdb/db/engine/storagegroup/DataRegion.java | 4 +-
.../db/metadata/cache/DataNodeSchemaCache.java | 187 ++++++--------
.../iotdb/db/metadata/cache/SchemaCacheEntry.java | 18 +-
.../metadata/cache/dualkeycache/IDualKeyCache.java | 57 +++++
.../dualkeycache/IDualKeyCacheComputation.java | 40 +++
.../cache/dualkeycache/IDualKeyCacheStats.java | 42 +++
.../dualkeycache/impl/CacheEntryGroupImpl.java | 75 ++++++
.../dualkeycache/impl/CacheSizeComputerImpl.java | 55 ++++
.../cache/dualkeycache/impl/CacheStats.java | 110 ++++++++
.../dualkeycache/impl/DualKeyCacheBuilder.java | 85 +++++++
.../cache/dualkeycache/impl/DualKeyCacheImpl.java | 281 +++++++++++++++++++++
.../dualkeycache/impl/DualKeyCachePolicy.java | 24 ++
.../cache/dualkeycache/impl/ICacheEntry.java | 39 +++
.../cache/dualkeycache/impl/ICacheEntryGroup.java | 44 ++++
.../dualkeycache/impl/ICacheEntryManager.java | 42 +++
.../dualkeycache/impl/ICacheSizeComputer.java | 29 +++
.../dualkeycache/impl/LRUCacheEntryManager.java | 216 ++++++++++++++++
.../plan/analyze/schema/ClusterSchemaFetcher.java | 2 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 4 +-
.../db/metadata/cache/DataNodeSchemaCacheTest.java | 8 -
.../cache/dualkeycache/DualKeyCacheTest.java | 110 ++++++++
21 files changed, 1355 insertions(+), 117 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 7d08e61b29..87cac75994 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1886,7 +1886,7 @@ public class DataRegion implements IDataRegionForQuery {
// delete Last cache record if necessary
// todo implement more precise process
- DataNodeSchemaCache.getInstance().cleanUp();
+ DataNodeSchemaCache.getInstance().invalidateAll();
// write log to impacted working TsFileProcessors
List<WALFlushListener> walListeners =
@@ -2295,7 +2295,7 @@ public class DataRegion implements IDataRegionForQuery {
if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
return;
}
- DataNodeSchemaCache.getInstance().cleanUp();
+ DataNodeSchemaCache.getInstance().invalidateAll();
}
/**
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index 714cf61aeb..c0004d3f31 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -24,14 +24,15 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCache;
+import
org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCacheComputation;
+import
org.apache.iotdb.db.metadata.cache.dualkeycache.impl.DualKeyCacheBuilder;
+import org.apache.iotdb.db.metadata.cache.dualkeycache.impl.DualKeyCachePolicy;
import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
-import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaComputation;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
@@ -51,25 +53,28 @@ public class DataNodeSchemaCache {
private static final Logger logger =
LoggerFactory.getLogger(DataNodeSchemaCache.class);
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
- private final Cache<PartialPath, SchemaCacheEntry> cache;
+ private final IDualKeyCache<PartialPath, String, SchemaCacheEntry>
dualKeyCache;
// cache update or clean have higher priority than cache read
private final ReentrantReadWriteLock readWriteLock = new
ReentrantReadWriteLock(false);
private DataNodeSchemaCache() {
- cache =
- Caffeine.newBuilder()
- .maximumWeight(config.getAllocateMemoryForSchemaCache())
- .weigher(
- (PartialPath key, SchemaCacheEntry value) ->
- PartialPath.estimateSize(key) +
SchemaCacheEntry.estimateSize(value))
- .recordStats()
+ DualKeyCacheBuilder<PartialPath, String, SchemaCacheEntry>
dualKeyCacheBuilder =
+ new DualKeyCacheBuilder<>();
+ dualKeyCache =
+ dualKeyCacheBuilder
+ .cacheEvictionPolicy(DualKeyCachePolicy.LRU)
+ .memoryCapacity(config.getAllocateMemoryForSchemaCache())
+ .firstKeySizeComputer(PartialPath::estimateSize)
+ .secondKeySizeComputer(s -> 32 + 2 * s.length())
+ .valueSizeComputer(SchemaCacheEntry::estimateSize)
.build();
+
MetricService.getInstance().addMetricSet(new
DataNodeSchemaCacheMetrics(this));
}
public double getHitRate() {
- return cache.stats().hitRate() * 100;
+ return dualKeyCache.stats().hitRate() * 100;
}
public static DataNodeSchemaCache getInstance() {
@@ -107,27 +112,40 @@ public class DataNodeSchemaCache {
public ClusterSchemaTree get(PartialPath devicePath, String[] measurements) {
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
Set<String> storageGroupSet = new HashSet<>();
- SchemaCacheEntry schemaCacheEntry;
- for (String measurement : measurements) {
- PartialPath path = devicePath.concatNode(measurement);
- schemaCacheEntry = cache.getIfPresent(path);
- if (schemaCacheEntry != null) {
- schemaTree.appendSingleMeasurement(
- devicePath.concatNode(schemaCacheEntry.getSchemaEntryId()),
- schemaCacheEntry.getMeasurementSchema(),
- schemaCacheEntry.getTagMap(),
- null,
- schemaCacheEntry.isAligned());
- storageGroupSet.add(schemaCacheEntry.getStorageGroup());
- }
- }
+
+ dualKeyCache.compute(
+ new IDualKeyCacheComputation<PartialPath, String, SchemaCacheEntry>() {
+ @Override
+ public PartialPath getFirstKey() {
+ return devicePath;
+ }
+
+ @Override
+ public String[] getSecondKeyList() {
+ return measurements;
+ }
+
+ @Override
+ public void computeValue(int index, SchemaCacheEntry value) {
+ if (value != null) {
+ schemaTree.appendSingleMeasurement(
+ devicePath.concatNode(value.getSchemaEntryId()),
+ value.getMeasurementSchema(),
+ value.getTagMap(),
+ null,
+ value.isAligned());
+ storageGroupSet.add(value.getStorageGroup());
+ }
+ }
+ });
schemaTree.setDatabases(storageGroupSet);
return schemaTree;
}
public ClusterSchemaTree get(PartialPath fullPath) {
+ SchemaCacheEntry schemaCacheEntry =
+ dualKeyCache.get(fullPath.getDevicePath(), fullPath.getMeasurement());
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
- SchemaCacheEntry schemaCacheEntry = cache.getIfPresent(fullPath);
if (schemaCacheEntry != null) {
schemaTree.appendSingleMeasurement(
fullPath,
@@ -141,42 +159,33 @@ public class DataNodeSchemaCache {
}
public List<Integer> compute(ISchemaComputation schemaComputation) {
- PartialPath devicePath = schemaComputation.getDevicePath();
- String[] measurements = schemaComputation.getMeasurements();
List<Integer> indexOfMissingMeasurements = new ArrayList<>();
- boolean isFirstMeasurement = true;
- PartialPath fullPath;
- for (int i = 0, length = measurements.length; i < length; i++) {
- String measurement = measurements[i];
- fullPath = devicePath.concatNode(measurement);
- SchemaCacheEntry schemaCacheEntry = cache.getIfPresent(fullPath);
- if (schemaCacheEntry == null) {
- indexOfMissingMeasurements.add(i);
- } else {
- if (isFirstMeasurement) {
- schemaComputation.computeDevice(schemaCacheEntry.isAligned());
- isFirstMeasurement = false;
- }
- schemaComputation.computeMeasurement(
- i,
- new IMeasurementSchemaInfo() {
- @Override
- public String getName() {
- return measurement;
+ final AtomicBoolean isFirstMeasurement = new AtomicBoolean();
+ dualKeyCache.compute(
+ new IDualKeyCacheComputation<PartialPath, String, SchemaCacheEntry>() {
+ @Override
+ public PartialPath getFirstKey() {
+ return schemaComputation.getDevicePath();
+ }
+
+ @Override
+ public String[] getSecondKeyList() {
+ return schemaComputation.getMeasurements();
+ }
+
+ @Override
+ public void computeValue(int index, SchemaCacheEntry value) {
+ if (value == null) {
+ indexOfMissingMeasurements.add(index);
+ } else {
+ if (isFirstMeasurement.get()) {
+ schemaComputation.computeDevice(value.isAligned());
+ isFirstMeasurement.getAndSet(false);
}
-
- @Override
- public MeasurementSchema getSchema() {
- return schemaCacheEntry.getMeasurementSchema();
- }
-
- @Override
- public String getAlias() {
- throw new UnsupportedOperationException();
- }
- });
- }
- }
+ schemaComputation.computeMeasurement(index, value);
+ }
+ }
+ });
return indexOfMissingMeasurements;
}
@@ -193,11 +202,13 @@ public class DataNodeSchemaCache {
(MeasurementSchema) measurementPath.getMeasurementSchema(),
measurementPath.getTagMap(),
measurementPath.isUnderAlignedEntity());
- cache.put(new PartialPath(measurementPath.getNodes()), schemaCacheEntry);
+ dualKeyCache.put(
+ measurementPath.getDevicePath(), measurementPath.getMeasurement(),
schemaCacheEntry);
}
public TimeValuePair getLastCache(PartialPath seriesPath) {
- SchemaCacheEntry entry = cache.getIfPresent(seriesPath);
+ SchemaCacheEntry entry =
+ dualKeyCache.get(seriesPath.getDevicePath(),
seriesPath.getMeasurement());
if (null == entry) {
return null;
}
@@ -211,7 +222,8 @@ public class DataNodeSchemaCache {
TimeValuePair timeValuePair,
boolean highPriorityUpdate,
Long latestFlushedTime) {
- SchemaCacheEntry entry = cache.getIfPresent(seriesPath);
+ SchemaCacheEntry entry =
+ dualKeyCache.get(seriesPath.getDevicePath(),
seriesPath.getMeasurement());
if (null == entry) {
return;
}
@@ -231,10 +243,11 @@ public class DataNodeSchemaCache {
boolean highPriorityUpdate,
Long latestFlushedTime) {
PartialPath seriesPath = measurementPath.transformToPartialPath();
- SchemaCacheEntry entry = cache.getIfPresent(seriesPath);
+ SchemaCacheEntry entry =
+ dualKeyCache.get(seriesPath.getDevicePath(),
seriesPath.getMeasurement());
if (null == entry) {
- synchronized (cache) {
- entry = cache.getIfPresent(seriesPath);
+ synchronized (dualKeyCache) {
+ entry = dualKeyCache.get(seriesPath.getDevicePath(),
seriesPath.getMeasurement());
if (null == entry) {
entry =
new SchemaCacheEntry(
@@ -242,7 +255,7 @@ public class DataNodeSchemaCache {
(MeasurementSchema) measurementPath.getMeasurementSchema(),
measurementPath.getTagMap(),
measurementPath.isUnderAlignedEntity());
- cache.put(seriesPath, entry);
+ dualKeyCache.put(seriesPath.getDevicePath(),
seriesPath.getMeasurement(), entry);
}
}
}
@@ -251,43 +264,11 @@ public class DataNodeSchemaCache {
entry, timeValuePair, highPriorityUpdate, latestFlushedTime);
}
- public void resetLastCache(PartialPath seriesPath) {
- SchemaCacheEntry entry = cache.getIfPresent(seriesPath);
- if (null == entry) {
- return;
- }
-
- DataNodeLastCacheManager.resetLastCache(entry);
- }
-
- /**
- * For delete timeseries meatadata cache operation
- *
- * @param partialPath
- * @return
- */
- public void invalidate(PartialPath partialPath) {
- resetLastCache(partialPath);
- cache.invalidate(partialPath);
- }
-
- public void invalidateMatchedSchema(PartialPath pathPattern) {
- cache
- .asMap()
- .forEach(
- (k, v) -> {
- if (pathPattern.matchFullPath(k)) {
- cache.invalidate(k);
- }
- });
- }
-
- public long estimatedSize() {
- return cache.estimatedSize();
+ public void invalidateAll() {
+ dualKeyCache.invalidateAll();
}
public void cleanUp() {
- cache.invalidateAll();
- cache.cleanUp();
+ dualKeyCache.cleanUp();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
index ad40c088e2..55387520aa 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
@@ -21,12 +21,13 @@ package org.apache.iotdb.db.metadata.cache;
import
org.apache.iotdb.db.metadata.cache.lastCache.container.ILastCacheContainer;
import
org.apache.iotdb.db.metadata.cache.lastCache.container.LastCacheContainer;
+import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.Map;
-public class SchemaCacheEntry {
+public class SchemaCacheEntry implements IMeasurementSchemaInfo {
private final String storageGroup;
@@ -109,4 +110,19 @@ public class SchemaCacheEntry {
// each char takes 2B in Java
return 100 + 2 *
schemaCacheEntry.getMeasurementSchema().getMeasurementId().length();
}
+
+ @Override
+ public String getName() {
+ return measurementSchema.getMeasurementId();
+ }
+
+ @Override
+ public MeasurementSchema getSchema() {
+ return measurementSchema;
+ }
+
+ @Override
+ public String getAlias() {
+ return null;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCache.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCache.java
new file mode 100644
index 0000000000..9f6fe21e6a
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCache.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache;
+
+/**
+ * This interfaces defines the behaviour of a dual key cache. A dual key cache
supports manage cache
+ * values via two keys, first key and second key. Simply, the structure is
like fk -> sk-> value.
+ *
+ * @param <FK> The first key of cache value
+ * @param <SK> The second key of cache value
+ * @param <V> The cache value
+ */
+public interface IDualKeyCache<FK, SK, V> {
+
+ /** Get the cache value with given first key and second key. */
+ V get(FK firstKey, SK secondKey);
+
+ /**
+ * Traverse target cache values via given first key and second keys provided
in computation and
+ * execute the defined computation logic.
+ */
+ void compute(IDualKeyCacheComputation<FK, SK, V> computation);
+
+ /** put the cache value into cache */
+ void put(FK firstKey, SK secondKey, V value);
+
+ /**
+ * Invalidate all cache values in the cache and clear related cache keys.
The cache status and
+ * statistics won't be clear and they can still be accessed via
cache.stats().
+ */
+ void invalidateAll();
+
+ /**
+ * Clean up all data and info of this cache, including cache keys, cache
values and cache stats.
+ */
+ void cleanUp();
+
+ /** Return all the current cache status and statistics. */
+ IDualKeyCacheStats stats();
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCacheComputation.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCacheComputation.java
new file mode 100644
index 0000000000..f0d66f2de6
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCacheComputation.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache;
+
+/**
+ * This interfaces defines the behaviour needed be implemented and executed
during cache value
+ * traverse.
+ *
+ * @param <FK> The first key of target cache values
+ * @param <SK> The second key of one target cache value
+ * @param <V> The cache value
+ */
+public interface IDualKeyCacheComputation<FK, SK, V> {
+
+ /** Return the first key of target cache values. */
+ FK getFirstKey();
+
+ /** Return the second key list of target cache values. */
+ SK[] getSecondKeyList();
+
+ /** Compute each target cache value. The index is the second key's position
in second key list. */
+ void computeValue(int index, V value);
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCacheStats.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCacheStats.java
new file mode 100644
index 0000000000..469f610b77
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCacheStats.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache;
+
+/** This interface defines the status and statistics, that will be provided ,
of dual key cache. */
+public interface IDualKeyCacheStats {
+
+ /**
+ * Return the count of recorded requests, since the cache has been utilized
after init or clean
+ * up.
+ */
+ long requestCount();
+
+ /**
+ * Return the count of recorded cache hit cases, since the cache has been
utilized after init or
+ * clean up.
+ */
+ long hitCount();
+
+ /** Return the hit rate of recorded cases, equal hitCount() /
requestCount(). */
+ double hitRate();
+
+ /** Return current memory usage of dual key cache. */
+ long memoryUsage();
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheEntryGroupImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheEntryGroupImpl.java
new file mode 100644
index 0000000000..81aa331ae6
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheEntryGroupImpl.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+
+public class CacheEntryGroupImpl<FK, SK, V, T extends ICacheEntry<SK, V>>
+ implements ICacheEntryGroup<FK, SK, V, T> {
+
+ private final FK firstKey;
+
+ private final Map<SK, T> cacheEntryMap = new ConcurrentHashMap<>();
+
+ CacheEntryGroupImpl(FK firstKey) {
+ this.firstKey = firstKey;
+ }
+
+ @Override
+ public FK getFirstKey() {
+ return firstKey;
+ }
+
+ @Override
+ public T getCacheEntry(SK secondKey) {
+ return cacheEntryMap.get(secondKey);
+ }
+
+ @Override
+ public T computeCacheEntry(SK secondKey, BiFunction<SK, T, T> computation) {
+ return cacheEntryMap.compute(secondKey, computation);
+ }
+
+ @Override
+ public T removeCacheEntry(SK secondKey) {
+ return cacheEntryMap.remove(secondKey);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return cacheEntryMap.isEmpty();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CacheEntryGroupImpl<?, ?, ?, ?> that = (CacheEntryGroupImpl<?, ?, ?, ?>) o;
+ return Objects.equals(firstKey, that.firstKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return firstKey.hashCode();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheSizeComputerImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheSizeComputerImpl.java
new file mode 100644
index 0000000000..dda4245972
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheSizeComputerImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+import java.util.function.Function;
+
+class CacheSizeComputerImpl<FK, SK, V> implements ICacheSizeComputer<FK, SK,
V> {
+
+ private final Function<FK, Integer> firstKeySizeComputer;
+
+ private final Function<SK, Integer> secondKeySizeComputer;
+
+ private final Function<V, Integer> valueSizeComputer;
+
+ CacheSizeComputerImpl(
+ Function<FK, Integer> firstKeySizeComputer,
+ Function<SK, Integer> secondKeySizeComputer,
+ Function<V, Integer> valueSizeCompute) {
+ this.firstKeySizeComputer = firstKeySizeComputer;
+ this.secondKeySizeComputer = secondKeySizeComputer;
+ this.valueSizeComputer = valueSizeCompute;
+ }
+
+ @Override
+ public int computeFirstKeySize(FK firstKey) {
+ return firstKeySizeComputer.apply(firstKey);
+ }
+
+ @Override
+ public int computeSecondKeySize(SK secondKey) {
+ return secondKeySizeComputer.apply(secondKey);
+ }
+
+ @Override
+ public int computeValueSize(V value) {
+ return valueSizeComputer.apply(value);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheStats.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheStats.java
new file mode 100644
index 0000000000..8ab314a86f
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheStats.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+import org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCacheStats;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+class CacheStats implements IDualKeyCacheStats {
+
+ // prepare some buffer for high load scenarios
+ private static final double MEMORY_THRESHOLD_RATIO = 0.8;
+
+ private final long memoryCapacity;
+ private final long memoryThreshold;
+
+ private final AtomicLong memoryUsage = new AtomicLong(0);
+
+ private final AtomicLong requestCount = new AtomicLong(0);
+ private final AtomicLong hitCount = new AtomicLong(0);
+
+ CacheStats(long memoryCapacity) {
+ this.memoryCapacity = memoryCapacity;
+ this.memoryThreshold = (long) (memoryCapacity * MEMORY_THRESHOLD_RATIO);
+ }
+
+ void increaseMemoryUsage(int size) {
+ memoryUsage.getAndAdd(size);
+ }
+
+ void decreaseMemoryUsage(int size) {
+ memoryUsage.getAndAdd(-size);
+ }
+
+ boolean isExceedMemoryCapacity() {
+ return memoryUsage.get() > memoryThreshold;
+ }
+
+ void recordHit(int num) {
+ if (requestCount.get() < 0) {
+ requestCount.set(0);
+ hitCount.set(0);
+ }
+ requestCount.getAndAdd(num);
+ hitCount.getAndAdd(num);
+ }
+
+ void recordMiss(int num) {
+ if (requestCount.get() < 0) {
+ requestCount.set(0);
+ hitCount.set(0);
+ }
+ requestCount.getAndAdd(num);
+ }
+
+ @Override
+ public long requestCount() {
+ return requestCount.get();
+ }
+
+ @Override
+ public long hitCount() {
+ return hitCount.get();
+ }
+
+ @Override
+ public double hitRate() {
+ long hitCount = this.hitCount.get();
+ if (hitCount == 0) {
+ return 0;
+ }
+ long requestCount = this.requestCount.get();
+ if (requestCount == 0) {
+ return 0;
+ }
+ return hitCount * 1.0 / requestCount;
+ }
+
+ @Override
+ public long memoryUsage() {
+ return memoryUsage.get();
+ }
+
+ void reset() {
+ resetMemoryUsage();
+ hitCount.set(0);
+ requestCount.set(0);
+ }
+
+ void resetMemoryUsage() {
+ memoryUsage.set(0);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheBuilder.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheBuilder.java
new file mode 100644
index 0000000000..d26cc8ac68
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheBuilder.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+import org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCache;
+
+import java.util.function.Function;
+
+/**
+ * This class defines and implements the behaviour needed for building a dual
key cache.
+ *
+ * @param <FK> The first key of cache value
+ * @param <SK> The second key of cache value
+ * @param <V> The cache value
+ */
+public class DualKeyCacheBuilder<FK, SK, V> {
+
+ private LRUCacheEntryManager<FK, SK, V> cacheEntryManager;
+
+ private long memoryCapacity;
+
+ private Function<FK, Integer> firstKeySizeComputer;
+
+ private Function<SK, Integer> secondKeySizeComputer;
+
+ private Function<V, Integer> valueSizeComputer;
+
+ /** Initiate and return a dual key cache instance. */
+ public IDualKeyCache<FK, SK, V> build() {
+ return new DualKeyCacheImpl<>(
+ cacheEntryManager,
+ new CacheSizeComputerImpl<>(firstKeySizeComputer,
secondKeySizeComputer, valueSizeComputer),
+ memoryCapacity);
+ }
+
+ /** Define the cache eviction policy of dual key cache. */
+ public DualKeyCacheBuilder<FK, SK, V> cacheEvictionPolicy(DualKeyCachePolicy
policy) {
+ if (policy == DualKeyCachePolicy.LRU) {
+ this.cacheEntryManager = new LRUCacheEntryManager<>();
+ return this;
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Define the memory capacity of dual key cache. */
+ public DualKeyCacheBuilder<FK, SK, V> memoryCapacity(long memoryCapacity) {
+ this.memoryCapacity = memoryCapacity;
+ return this;
+ }
+
+ /** Define how to compute the memory usage of a first key in dual key cache.
*/
+ public DualKeyCacheBuilder<FK, SK, V> firstKeySizeComputer(Function<FK,
Integer> computer) {
+ this.firstKeySizeComputer = computer;
+ return this;
+ }
+
+ /** Define how to compute the memory usage of a second key in dual key
cache. */
+ public DualKeyCacheBuilder<FK, SK, V> secondKeySizeComputer(Function<SK,
Integer> computer) {
+ this.secondKeySizeComputer = computer;
+ return this;
+ }
+
+ /** Define how to compute the memory usage of a cache value in dual key
cache. */
+ public DualKeyCacheBuilder<FK, SK, V> valueSizeComputer(Function<V, Integer>
computer) {
+ this.valueSizeComputer = computer;
+ return this;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheImpl.java
new file mode 100644
index 0000000000..0704e75f7c
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheImpl.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+import org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCache;
+import
org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCacheComputation;
+import org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCacheStats;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
+
+class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK, V>>
+ implements IDualKeyCache<FK, SK, V> {
+
+ private final SegmentedConcurrentHashMap<FK, ICacheEntryGroup<FK, SK, V, T>>
firstKeyMap =
+ new SegmentedConcurrentHashMap<>();
+
+ private final ICacheEntryManager<FK, SK, V, T> cacheEntryManager;
+
+ private final ICacheSizeComputer<FK, SK, V> sizeComputer;
+
+ private final CacheStats cacheStats;
+
+ private final ReentrantReadWriteLock readWriteLock = new
ReentrantReadWriteLock(true);
+
+ DualKeyCacheImpl(
+ ICacheEntryManager<FK, SK, V, T> cacheEntryManager,
+ ICacheSizeComputer<FK, SK, V> sizeComputer,
+ long memoryCapacity) {
+ this.cacheEntryManager = cacheEntryManager;
+ this.sizeComputer = sizeComputer;
+ this.cacheStats = new CacheStats(memoryCapacity);
+ }
+
+ @Override
+ public V get(FK firstKey, SK secondKey) {
+ readWriteLock.readLock().lock();
+ try {
+ ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(firstKey);
+ if (cacheEntryGroup == null) {
+ cacheStats.recordMiss(1);
+ return null;
+ } else {
+ T cacheEntry = cacheEntryGroup.getCacheEntry(secondKey);
+ if (cacheEntry == null) {
+ cacheStats.recordMiss(1);
+ return null;
+ } else {
+ cacheEntryManager.access(cacheEntry);
+ cacheStats.recordHit(1);
+ return cacheEntry.getValue();
+ }
+ }
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void compute(IDualKeyCacheComputation<FK, SK, V> computation) {
+ readWriteLock.readLock().lock();
+ try {
+ FK firstKey = computation.getFirstKey();
+ ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(firstKey);
+ SK[] secondKeyList = computation.getSecondKeyList();
+ if (cacheEntryGroup == null) {
+ for (int i = 0; i < secondKeyList.length; i++) {
+ computation.computeValue(i, null);
+ }
+ cacheStats.recordMiss(secondKeyList.length);
+ } else {
+ T cacheEntry;
+ int hitCount = 0;
+ for (int i = 0; i < secondKeyList.length; i++) {
+ cacheEntry = cacheEntryGroup.getCacheEntry(secondKeyList[i]);
+ if (cacheEntry == null) {
+ computation.computeValue(i, null);
+ } else {
+ computation.computeValue(i, cacheEntry.getValue());
+ cacheEntryManager.access(cacheEntry);
+ hitCount++;
+ }
+ }
+ cacheStats.recordHit(hitCount);
+ cacheStats.recordMiss(secondKeyList.length - hitCount);
+ }
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void put(FK firstKey, SK secondKey, V value) {
+ readWriteLock.readLock().lock();
+ try {
+ int usedMemorySize = putToCache(firstKey, secondKey, value);
+ cacheStats.increaseMemoryUsage(usedMemorySize);
+ if (cacheStats.isExceedMemoryCapacity()) {
+ executeCacheEviction(usedMemorySize);
+ }
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ private int putToCache(FK firstKey, SK secondKey, V value) {
+ AtomicInteger usedMemorySize = new AtomicInteger(0);
+ firstKeyMap.compute(
+ firstKey,
+ (k, cacheEntryGroup) -> {
+ if (cacheEntryGroup == null) {
+ cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey);
+
usedMemorySize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
+ }
+ ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
+ cacheEntryGroup.computeCacheEntry(
+ secondKey,
+ (sk, cacheEntry) -> {
+ if (cacheEntry == null) {
+ cacheEntry =
+ cacheEntryManager.createCacheEntry(secondKey, value,
finalCacheEntryGroup);
+ cacheEntryManager.put(cacheEntry);
+
usedMemorySize.getAndAdd(sizeComputer.computeSecondKeySize(sk));
+ } else {
+ V existingValue = cacheEntry.getValue();
+ if (existingValue != value && !existingValue.equals(value)) {
+ cacheEntry.replaceValue(value);
+
usedMemorySize.getAndAdd(-sizeComputer.computeValueSize(existingValue));
+ }
+ // update the cache status
+ cacheEntryManager.access(cacheEntry);
+ }
+ usedMemorySize.getAndAdd(sizeComputer.computeValueSize(value));
+ return cacheEntry;
+ });
+ return cacheEntryGroup;
+ });
+ return usedMemorySize.get();
+ }
+
+ /**
+ * Each thread putting new cache value only needs to evict cache values,
total memory equals that
+ * the new cache value occupied.
+ */
+ private void executeCacheEviction(int targetSize) {
+ int evictedSize;
+ while (targetSize > 0 && cacheStats.memoryUsage() > 0) {
+ evictedSize = evictOneCacheEntry();
+ cacheStats.decreaseMemoryUsage(evictedSize);
+ targetSize -= evictedSize;
+ }
+ }
+
+ private int evictOneCacheEntry() {
+
+ ICacheEntry<SK, V> evictCacheEntry = cacheEntryManager.evict();
+ if (evictCacheEntry == null) {
+ return 0;
+ }
+ AtomicInteger evictedSize = new AtomicInteger(0);
+
evictedSize.getAndAdd(sizeComputer.computeValueSize(evictCacheEntry.getValue()));
+
+ ICacheEntryGroup<FK, SK, V, T> belongedGroup =
evictCacheEntry.getBelongedGroup();
+ belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
+
evictedSize.getAndAdd(sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey()));
+
+ if (belongedGroup.isEmpty()) {
+ firstKeyMap.compute(
+ belongedGroup.getFirstKey(),
+ (firstKey, cacheEntryGroup) -> {
+ if (cacheEntryGroup == null) {
+ // has been removed by other threads
+ return null;
+ }
+ if (cacheEntryGroup.isEmpty()) {
+
evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
+ return null;
+ }
+
+ // some other thread has put value to it
+ return cacheEntryGroup;
+ });
+ }
+ return evictedSize.get();
+ }
+
+ @Override
+ public void invalidateAll() {
+ readWriteLock.writeLock().lock();
+ try {
+ executeInvalidateAll();
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ private void executeInvalidateAll() {
+ firstKeyMap.clear();
+ cacheEntryManager.cleanUp();
+ cacheStats.resetMemoryUsage();
+ }
+
+ @Override
+ public void cleanUp() {
+ readWriteLock.writeLock().lock();
+ try {
+ executeInvalidateAll();
+ cacheStats.reset();
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public IDualKeyCacheStats stats() {
+ return cacheStats;
+ }
+
+ /**
+ * Since the capacity of one instance of ConcurrentHashMap is about 4
million, a number of
+ * instances are united for more capacity.
+ */
+ private static class SegmentedConcurrentHashMap<K, V> {
+
+ private static final int SLOT_NUM = 31;
+
+ private final Map<K, V>[] maps = new ConcurrentHashMap[SLOT_NUM];
+
+ V get(K key) {
+ return getBelongedMap(key).get(key);
+ }
+
+ V compute(K key, BiFunction<? super K, ? super V, ? extends V>
remappingFunction) {
+ return getBelongedMap(key).compute(key, remappingFunction);
+ }
+
+ void clear() {
+ synchronized (maps) {
+ for (int i = 0; i < SLOT_NUM; i++) {
+ maps[i] = null;
+ }
+ }
+ }
+
+ Map<K, V> getBelongedMap(K key) {
+ int slotIndex = key.hashCode() % SLOT_NUM;
+ slotIndex = slotIndex < 0 ? slotIndex + SLOT_NUM : slotIndex;
+ Map<K, V> map = maps[slotIndex];
+ if (map == null) {
+ synchronized (maps) {
+ map = maps[slotIndex];
+ if (map == null) {
+ map = new ConcurrentHashMap<>();
+ maps[slotIndex] = map;
+ }
+ }
+ }
+ return map;
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCachePolicy.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCachePolicy.java
new file mode 100644
index 0000000000..0211a98231
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCachePolicy.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+public enum DualKeyCachePolicy {
+ LRU;
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntry.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntry.java
new file mode 100644
index 0000000000..b9e3808935
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntry.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+/**
+ * This interface defines the behaviour of a cache entry holding the cache
value. The cache entry is
+ * mainly accessed via second key from cache entry group and managed by cache
entry manager for
+ * cache eviction.
+ *
+ * @param <SK> The second key of cache value.
+ * @param <V> The cache value.
+ */
+interface ICacheEntry<SK, V> {
+
+ SK getSecondKey();
+
+ V getValue();
+
+ ICacheEntryGroup getBelongedGroup();
+
+ void replaceValue(V newValue);
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntryGroup.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntryGroup.java
new file mode 100644
index 0000000000..186c813dab
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntryGroup.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+import java.util.function.BiFunction;
+
+/**
+ * This interface defines the behaviour of a cache entry group, which is
mainly accessed via first
+ * key from dual key cache and holds all the second keys and cache values
indexed by the first key.
+ *
+ * @param <FK> The first key of cache value.
+ * @param <SK> The second key of cache value.
+ * @param <V> The cache value.
+ * @param <T> The cache entry holding cache value.
+ */
+interface ICacheEntryGroup<FK, SK, V, T extends ICacheEntry<SK, V>> {
+
+ FK getFirstKey();
+
+ T getCacheEntry(SK secondKey);
+
+ T computeCacheEntry(SK secondKey, BiFunction<SK, T, T> computation);
+
+ T removeCacheEntry(SK secondKey);
+
+ boolean isEmpty();
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntryManager.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntryManager.java
new file mode 100644
index 0000000000..d67dc2afbb
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntryManager.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+/**
+ * This interface defines the behaviour of a cache entry manager, which takes
the responsibility of
+ * cache value status management and cache eviction.
+ *
+ * @param <FK> The first key of cache value.
+ * @param <SK> The second key of cache value.
+ * @param <V> The cache value.
+ * @param <T> The cache entry holding cache value.
+ */
+interface ICacheEntryManager<FK, SK, V, T extends ICacheEntry<SK, V>> {
+
+ T createCacheEntry(SK secondKey, V value, ICacheEntryGroup<FK, SK, V, T>
cacheEntryGroup);
+
+ void access(T cacheEntry);
+
+ void put(T cacheEntry);
+
+ T evict();
+
+ void cleanUp();
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheSizeComputer.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheSizeComputer.java
new file mode 100644
index 0000000000..0187080604
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheSizeComputer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+interface ICacheSizeComputer<FK, SK, V> {
+
+ int computeFirstKeySize(FK firstKey);
+
+ int computeSecondKeySize(SK secondKey);
+
+ int computeValueSize(V value);
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/LRUCacheEntryManager.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/LRUCacheEntryManager.java
new file mode 100644
index 0000000000..e6ba42fc36
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/LRUCacheEntryManager.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+import java.util.Objects;
+import java.util.Random;
+
+/**
+ * This class implements the cache entry manager with LRU policy.
+ *
+ * @param <FK> The first key of cache value.
+ * @param <SK> The second key of cache value.
+ * @param <V> The cache value.
+ */
+class LRUCacheEntryManager<FK, SK, V>
+ implements ICacheEntryManager<FK, SK, V,
LRUCacheEntryManager.LRUCacheEntry<SK, V>> {
+
+ private static final int SLOT_NUM = 128;
+
+ private final LRULinkedList[] lruLinkedLists = new LRULinkedList[SLOT_NUM];
+
+ private final Random idxGenerator = new Random();
+
+ @Override
+ public LRUCacheEntry<SK, V> createCacheEntry(
+ SK secondKey, V value, ICacheEntryGroup<FK, SK, V, LRUCacheEntry<SK, V>>
cacheEntryGroup) {
+ return new LRUCacheEntry<>(secondKey, value, cacheEntryGroup);
+ }
+
+ @Override
+ public void access(LRUCacheEntry<SK, V> cacheEntry) {
+ getBelongedList(cacheEntry).moveToHead(cacheEntry);
+ }
+
+ @Override
+ public void put(LRUCacheEntry<SK, V> cacheEntry) {
+ getBelongedList(cacheEntry).add(cacheEntry);
+ }
+
+ @Override
+ public LRUCacheEntry<SK, V> evict() {
+ int startIndex = idxGenerator.nextInt(SLOT_NUM);
+ LRULinkedList lruLinkedList;
+ LRUCacheEntry<SK, V> cacheEntry;
+ for (int i = 0; i < SLOT_NUM; i++) {
+ if (startIndex == SLOT_NUM) {
+ startIndex = 0;
+ }
+ lruLinkedList = lruLinkedLists[startIndex];
+ if (lruLinkedList != null) {
+ cacheEntry = lruLinkedList.evict();
+ if (cacheEntry != null) {
+ return cacheEntry;
+ }
+ }
+ startIndex++;
+ }
+ return null;
+ }
+
+ @Override
+ public void cleanUp() {
+ synchronized (lruLinkedLists) {
+ for (int i = 0; i < SLOT_NUM; i++) {
+ lruLinkedLists[i] = null;
+ }
+ }
+ }
+
+ private LRULinkedList getBelongedList(LRUCacheEntry<SK, V> cacheEntry) {
+ int slotIndex = cacheEntry.hashCode() % SLOT_NUM;
+ slotIndex = slotIndex < 0 ? slotIndex + SLOT_NUM : slotIndex;
+ LRULinkedList lruLinkedList = lruLinkedLists[slotIndex];
+ if (lruLinkedList == null) {
+ synchronized (lruLinkedLists) {
+ lruLinkedList = lruLinkedLists[slotIndex];
+ if (lruLinkedList == null) {
+ lruLinkedList = new LRULinkedList();
+ lruLinkedLists[slotIndex] = lruLinkedList;
+ }
+ }
+ }
+ return lruLinkedList;
+ }
+
+ static class LRUCacheEntry<SK, V> implements ICacheEntry<SK, V> {
+
+ private final SK secondKey;
+ private final ICacheEntryGroup cacheEntryGroup;
+
+ private V value;
+
+ private LRUCacheEntry<SK, V> pre;
+ private LRUCacheEntry<SK, V> next;
+
+ private LRUCacheEntry(SK secondKey, V value, ICacheEntryGroup
cacheEntryGroup) {
+ this.secondKey = secondKey;
+ this.value = value;
+ this.cacheEntryGroup = cacheEntryGroup;
+ }
+
+ @Override
+ public SK getSecondKey() {
+ return secondKey;
+ }
+
+ @Override
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public ICacheEntryGroup getBelongedGroup() {
+ return cacheEntryGroup;
+ }
+
+ @Override
+ public void replaceValue(V newValue) {
+ this.value = newValue;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ LRUCacheEntry<?, ?> that = (LRUCacheEntry<?, ?>) o;
+ return Objects.equals(secondKey, that.secondKey)
+ && Objects.equals(cacheEntryGroup, that.cacheEntryGroup);
+ }
+
+ @Override
+ public int hashCode() {
+ return cacheEntryGroup.hashCode() * 31 + secondKey.hashCode();
+ }
+ }
+
+ private static class LRULinkedList {
+
+ private LRUCacheEntry head;
+ private LRUCacheEntry tail;
+
+ synchronized void add(LRUCacheEntry cacheEntry) {
+ if (head == null) {
+ head = cacheEntry;
+ tail = cacheEntry;
+ return;
+ }
+
+ head.pre = cacheEntry;
+ cacheEntry.next = head;
+
+ head = cacheEntry;
+ }
+
+ synchronized LRUCacheEntry evict() {
+ if (tail == null) {
+ return null;
+ }
+
+ LRUCacheEntry cacheEntry = tail;
+ tail = tail.pre;
+
+ if (tail == null) {
+ head = null;
+ } else {
+ tail.next = null;
+ }
+
+ cacheEntry.pre = null;
+
+ return cacheEntry;
+ }
+
+ synchronized void moveToHead(LRUCacheEntry cacheEntry) {
+ if (head == null) {
+ // this cache entry has been evicted and the cache list is empty
+ return;
+ }
+
+ if (cacheEntry.pre == null) {
+ // this entry is head
+ return;
+ }
+
+ cacheEntry.pre.next = cacheEntry.next;
+
+ if (cacheEntry.next != null) {
+ cacheEntry.next.pre = cacheEntry.pre;
+ }
+
+ cacheEntry.pre = null;
+
+ head.pre = cacheEntry;
+ cacheEntry.next = head;
+
+ head = cacheEntry;
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 002882d930..c743d530d3 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -476,6 +476,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher
{
@Override
public void invalidAllCache() {
- DataNodeSchemaCache.getInstance().cleanUp();
+ DataNodeSchemaCache.getInstance().invalidateAll();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 503e0f0a06..66b4c540cf 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -395,7 +395,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) {
- DataNodeSchemaCache.getInstance().cleanUp();
+ DataNodeSchemaCache.getInstance().invalidateAll();
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@@ -477,7 +477,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
cache.takeWriteLock();
try {
// todo implement precise timeseries clean rather than clean all
- cache.cleanUp();
+ cache.invalidateAll();
} finally {
cache.releaseWriteLock();
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
index 88148499a6..c5504f9c25 100644
---
a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
@@ -88,7 +88,6 @@ public class DataNodeSchemaCacheTest {
TSDataType.BOOLEAN,
schemaCacheEntryMap.get(new
PartialPath("root.sg1.d1.s3")).getTsDataType());
Assert.assertNull(schemaCacheEntryMap.get(new
PartialPath("root.sg1.d1.s3")).getTagMap());
- Assert.assertEquals(3, dataNodeSchemaCache.estimatedSize());
String[] otherMeasurements = new String[3];
otherMeasurements[0] = "s3";
@@ -120,7 +119,6 @@ public class DataNodeSchemaCacheTest {
TSDataType.INT64,
schemaCacheEntryMap.get(new
PartialPath("root.sg1.d1.s5")).getTsDataType());
Assert.assertNull(schemaCacheEntryMap.get(new
PartialPath("root.sg1.d1.s4")).getTagMap());
- Assert.assertEquals(5, dataNodeSchemaCache.estimatedSize());
}
@Test
@@ -182,12 +180,6 @@ public class DataNodeSchemaCacheTest {
Assert.assertEquals(value3, cachedTimeValuePair3.getValue());
Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
-
- // invalid cache
- dataNodeSchemaCache.invalidate(seriesPath1);
- Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath1));
- Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
- Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
}
private ISchemaTree generateSchemaTree1() throws IllegalPathException {
diff --git
a/server/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
b/server/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
new file mode 100644
index 0000000000..7184124927
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache;
+
+import
org.apache.iotdb.db.metadata.cache.dualkeycache.impl.DualKeyCacheBuilder;
+import org.apache.iotdb.db.metadata.cache.dualkeycache.impl.DualKeyCachePolicy;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DualKeyCacheTest {
+
+ @Test
+ public void testBasicReadPut() {
+ DualKeyCacheBuilder<String, String, String> dualKeyCacheBuilder = new
DualKeyCacheBuilder<>();
+ IDualKeyCache<String, String, String> dualKeyCache =
+ dualKeyCacheBuilder
+ .cacheEvictionPolicy(DualKeyCachePolicy.LRU)
+ .memoryCapacity(300)
+ .firstKeySizeComputer(this::computeStringSize)
+ .secondKeySizeComputer(this::computeStringSize)
+ .valueSizeComputer(this::computeStringSize)
+ .build();
+
+ String[] firstKeyList = new String[] {"root.db.d1", "root.db.d2"};
+ String[] secondKeyList = new String[] {"s1", "s2"};
+ String[][] valueTable =
+ new String[][] {new String[] {"1-1", "1-2"}, new String[] {"2-1",
"2-2"}};
+
+ for (int i = 0; i < firstKeyList.length; i++) {
+ for (int j = 0; j < secondKeyList.length; j++) {
+ dualKeyCache.put(firstKeyList[i], secondKeyList[j], valueTable[i][j]);
+ }
+ }
+
+ int firstKeyOfMissingEntry = -1, secondKeyOfMissingEntry = -1;
+ for (int i = 0; i < firstKeyList.length; i++) {
+ for (int j = 0; j < secondKeyList.length; j++) {
+ String value = dualKeyCache.get(firstKeyList[i], secondKeyList[j]);
+ if (value == null) {
+ if (firstKeyOfMissingEntry == -1) {
+ firstKeyOfMissingEntry = i;
+ secondKeyOfMissingEntry = j;
+ } else {
+ Assert.fail();
+ }
+ } else {
+ Assert.assertEquals(valueTable[i][j], value);
+ }
+ }
+ }
+
+ Assert.assertEquals(230, dualKeyCache.stats().memoryUsage());
+ Assert.assertEquals(4, dualKeyCache.stats().requestCount());
+ Assert.assertEquals(3, dualKeyCache.stats().hitCount());
+
+ dualKeyCache.put(
+ firstKeyList[firstKeyOfMissingEntry],
+ secondKeyList[secondKeyOfMissingEntry],
+ valueTable[firstKeyOfMissingEntry][secondKeyOfMissingEntry]);
+ Assert.assertEquals(230, dualKeyCache.stats().memoryUsage());
+
+ for (int i = 0; i < firstKeyList.length; i++) {
+ int finalI = i;
+ dualKeyCache.compute(
+ new IDualKeyCacheComputation<String, String, String>() {
+ @Override
+ public String getFirstKey() {
+ return firstKeyList[finalI];
+ }
+
+ @Override
+ public String[] getSecondKeyList() {
+ return secondKeyList;
+ }
+
+ @Override
+ public void computeValue(int index, String value) {
+ if (value != null) {
+ Assert.assertEquals(valueTable[finalI][index], value);
+ }
+ }
+ });
+ }
+
+ Assert.assertEquals(8, dualKeyCache.stats().requestCount());
+ Assert.assertEquals(6, dualKeyCache.stats().hitCount());
+ }
+
+ private int computeStringSize(String string) {
+ return 8 + 8 + 4 + 2 * string.length();
+ }
+}