This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d6135299cb [IOTDB-5474] Refactor DataNodeSchemaCache Structure (#9050)
d6135299cb is described below

commit d6135299cb3321d4b3db535f0a6a19198a5f9083
Author: Marcos_Zyk <[email protected]>
AuthorDate: Thu Feb 16 14:32:29 2023 +0800

    [IOTDB-5474] Refactor DataNodeSchemaCache Structure (#9050)
---
 .../iotdb/db/engine/storagegroup/DataRegion.java   |   4 +-
 .../db/metadata/cache/DataNodeSchemaCache.java     | 187 ++++++--------
 .../iotdb/db/metadata/cache/SchemaCacheEntry.java  |  18 +-
 .../metadata/cache/dualkeycache/IDualKeyCache.java |  57 +++++
 .../dualkeycache/IDualKeyCacheComputation.java     |  40 +++
 .../cache/dualkeycache/IDualKeyCacheStats.java     |  42 +++
 .../dualkeycache/impl/CacheEntryGroupImpl.java     |  75 ++++++
 .../dualkeycache/impl/CacheSizeComputerImpl.java   |  55 ++++
 .../cache/dualkeycache/impl/CacheStats.java        | 110 ++++++++
 .../dualkeycache/impl/DualKeyCacheBuilder.java     |  85 +++++++
 .../cache/dualkeycache/impl/DualKeyCacheImpl.java  | 281 +++++++++++++++++++++
 .../dualkeycache/impl/DualKeyCachePolicy.java      |  24 ++
 .../cache/dualkeycache/impl/ICacheEntry.java       |  39 +++
 .../cache/dualkeycache/impl/ICacheEntryGroup.java  |  44 ++++
 .../dualkeycache/impl/ICacheEntryManager.java      |  42 +++
 .../dualkeycache/impl/ICacheSizeComputer.java      |  29 +++
 .../dualkeycache/impl/LRUCacheEntryManager.java    | 216 ++++++++++++++++
 .../plan/analyze/schema/ClusterSchemaFetcher.java  |   2 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   4 +-
 .../db/metadata/cache/DataNodeSchemaCacheTest.java |   8 -
 .../cache/dualkeycache/DualKeyCacheTest.java       | 110 ++++++++
 21 files changed, 1355 insertions(+), 117 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 7d08e61b29..87cac75994 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1886,7 +1886,7 @@ public class DataRegion implements IDataRegionForQuery {
 
       // delete Last cache record if necessary
       // todo implement more precise process
-      DataNodeSchemaCache.getInstance().cleanUp();
+      DataNodeSchemaCache.getInstance().invalidateAll();
 
       // write log to impacted working TsFileProcessors
       List<WALFlushListener> walListeners =
@@ -2295,7 +2295,7 @@ public class DataRegion implements IDataRegionForQuery {
     if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
       return;
     }
-    DataNodeSchemaCache.getInstance().cleanUp();
+    DataNodeSchemaCache.getInstance().invalidateAll();
   }
 
   /**
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index 714cf61aeb..c0004d3f31 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -24,14 +24,15 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCache;
+import 
org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCacheComputation;
+import 
org.apache.iotdb.db.metadata.cache.dualkeycache.impl.DualKeyCacheBuilder;
+import org.apache.iotdb.db.metadata.cache.dualkeycache.impl.DualKeyCachePolicy;
 import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
-import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
 import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaComputation;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +41,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
@@ -51,25 +53,28 @@ public class DataNodeSchemaCache {
   private static final Logger logger = 
LoggerFactory.getLogger(DataNodeSchemaCache.class);
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
 
-  private final Cache<PartialPath, SchemaCacheEntry> cache;
+  private final IDualKeyCache<PartialPath, String, SchemaCacheEntry> 
dualKeyCache;
 
   // cache update or clean have higher priority than cache read
   private final ReentrantReadWriteLock readWriteLock = new 
ReentrantReadWriteLock(false);
 
   private DataNodeSchemaCache() {
-    cache =
-        Caffeine.newBuilder()
-            .maximumWeight(config.getAllocateMemoryForSchemaCache())
-            .weigher(
-                (PartialPath key, SchemaCacheEntry value) ->
-                    PartialPath.estimateSize(key) + 
SchemaCacheEntry.estimateSize(value))
-            .recordStats()
+    DualKeyCacheBuilder<PartialPath, String, SchemaCacheEntry> 
dualKeyCacheBuilder =
+        new DualKeyCacheBuilder<>();
+    dualKeyCache =
+        dualKeyCacheBuilder
+            .cacheEvictionPolicy(DualKeyCachePolicy.LRU)
+            .memoryCapacity(config.getAllocateMemoryForSchemaCache())
+            .firstKeySizeComputer(PartialPath::estimateSize)
+            .secondKeySizeComputer(s -> 32 + 2 * s.length())
+            .valueSizeComputer(SchemaCacheEntry::estimateSize)
             .build();
+
     MetricService.getInstance().addMetricSet(new 
DataNodeSchemaCacheMetrics(this));
   }
 
   public double getHitRate() {
-    return cache.stats().hitRate() * 100;
+    return dualKeyCache.stats().hitRate() * 100;
   }
 
   public static DataNodeSchemaCache getInstance() {
@@ -107,27 +112,40 @@ public class DataNodeSchemaCache {
   public ClusterSchemaTree get(PartialPath devicePath, String[] measurements) {
     ClusterSchemaTree schemaTree = new ClusterSchemaTree();
     Set<String> storageGroupSet = new HashSet<>();
-    SchemaCacheEntry schemaCacheEntry;
-    for (String measurement : measurements) {
-      PartialPath path = devicePath.concatNode(measurement);
-      schemaCacheEntry = cache.getIfPresent(path);
-      if (schemaCacheEntry != null) {
-        schemaTree.appendSingleMeasurement(
-            devicePath.concatNode(schemaCacheEntry.getSchemaEntryId()),
-            schemaCacheEntry.getMeasurementSchema(),
-            schemaCacheEntry.getTagMap(),
-            null,
-            schemaCacheEntry.isAligned());
-        storageGroupSet.add(schemaCacheEntry.getStorageGroup());
-      }
-    }
+
+    dualKeyCache.compute(
+        new IDualKeyCacheComputation<PartialPath, String, SchemaCacheEntry>() {
+          @Override
+          public PartialPath getFirstKey() {
+            return devicePath;
+          }
+
+          @Override
+          public String[] getSecondKeyList() {
+            return measurements;
+          }
+
+          @Override
+          public void computeValue(int index, SchemaCacheEntry value) {
+            if (value != null) {
+              schemaTree.appendSingleMeasurement(
+                  devicePath.concatNode(value.getSchemaEntryId()),
+                  value.getMeasurementSchema(),
+                  value.getTagMap(),
+                  null,
+                  value.isAligned());
+              storageGroupSet.add(value.getStorageGroup());
+            }
+          }
+        });
     schemaTree.setDatabases(storageGroupSet);
     return schemaTree;
   }
 
   public ClusterSchemaTree get(PartialPath fullPath) {
+    SchemaCacheEntry schemaCacheEntry =
+        dualKeyCache.get(fullPath.getDevicePath(), fullPath.getMeasurement());
     ClusterSchemaTree schemaTree = new ClusterSchemaTree();
-    SchemaCacheEntry schemaCacheEntry = cache.getIfPresent(fullPath);
     if (schemaCacheEntry != null) {
       schemaTree.appendSingleMeasurement(
           fullPath,
@@ -141,42 +159,33 @@ public class DataNodeSchemaCache {
   }
 
   public List<Integer> compute(ISchemaComputation schemaComputation) {
-    PartialPath devicePath = schemaComputation.getDevicePath();
-    String[] measurements = schemaComputation.getMeasurements();
     List<Integer> indexOfMissingMeasurements = new ArrayList<>();
-    boolean isFirstMeasurement = true;
-    PartialPath fullPath;
-    for (int i = 0, length = measurements.length; i < length; i++) {
-      String measurement = measurements[i];
-      fullPath = devicePath.concatNode(measurement);
-      SchemaCacheEntry schemaCacheEntry = cache.getIfPresent(fullPath);
-      if (schemaCacheEntry == null) {
-        indexOfMissingMeasurements.add(i);
-      } else {
-        if (isFirstMeasurement) {
-          schemaComputation.computeDevice(schemaCacheEntry.isAligned());
-          isFirstMeasurement = false;
-        }
-        schemaComputation.computeMeasurement(
-            i,
-            new IMeasurementSchemaInfo() {
-              @Override
-              public String getName() {
-                return measurement;
+    final AtomicBoolean isFirstMeasurement = new AtomicBoolean();
+    dualKeyCache.compute(
+        new IDualKeyCacheComputation<PartialPath, String, SchemaCacheEntry>() {
+          @Override
+          public PartialPath getFirstKey() {
+            return schemaComputation.getDevicePath();
+          }
+
+          @Override
+          public String[] getSecondKeyList() {
+            return schemaComputation.getMeasurements();
+          }
+
+          @Override
+          public void computeValue(int index, SchemaCacheEntry value) {
+            if (value == null) {
+              indexOfMissingMeasurements.add(index);
+            } else {
+              if (isFirstMeasurement.get()) {
+                schemaComputation.computeDevice(value.isAligned());
+                isFirstMeasurement.getAndSet(false);
               }
-
-              @Override
-              public MeasurementSchema getSchema() {
-                return schemaCacheEntry.getMeasurementSchema();
-              }
-
-              @Override
-              public String getAlias() {
-                throw new UnsupportedOperationException();
-              }
-            });
-      }
-    }
+              schemaComputation.computeMeasurement(index, value);
+            }
+          }
+        });
     return indexOfMissingMeasurements;
   }
 
@@ -193,11 +202,13 @@ public class DataNodeSchemaCache {
             (MeasurementSchema) measurementPath.getMeasurementSchema(),
             measurementPath.getTagMap(),
             measurementPath.isUnderAlignedEntity());
-    cache.put(new PartialPath(measurementPath.getNodes()), schemaCacheEntry);
+    dualKeyCache.put(
+        measurementPath.getDevicePath(), measurementPath.getMeasurement(), 
schemaCacheEntry);
   }
 
   public TimeValuePair getLastCache(PartialPath seriesPath) {
-    SchemaCacheEntry entry = cache.getIfPresent(seriesPath);
+    SchemaCacheEntry entry =
+        dualKeyCache.get(seriesPath.getDevicePath(), 
seriesPath.getMeasurement());
     if (null == entry) {
       return null;
     }
@@ -211,7 +222,8 @@ public class DataNodeSchemaCache {
       TimeValuePair timeValuePair,
       boolean highPriorityUpdate,
       Long latestFlushedTime) {
-    SchemaCacheEntry entry = cache.getIfPresent(seriesPath);
+    SchemaCacheEntry entry =
+        dualKeyCache.get(seriesPath.getDevicePath(), 
seriesPath.getMeasurement());
     if (null == entry) {
       return;
     }
@@ -231,10 +243,11 @@ public class DataNodeSchemaCache {
       boolean highPriorityUpdate,
       Long latestFlushedTime) {
     PartialPath seriesPath = measurementPath.transformToPartialPath();
-    SchemaCacheEntry entry = cache.getIfPresent(seriesPath);
+    SchemaCacheEntry entry =
+        dualKeyCache.get(seriesPath.getDevicePath(), 
seriesPath.getMeasurement());
     if (null == entry) {
-      synchronized (cache) {
-        entry = cache.getIfPresent(seriesPath);
+      synchronized (dualKeyCache) {
+        entry = dualKeyCache.get(seriesPath.getDevicePath(), 
seriesPath.getMeasurement());
         if (null == entry) {
           entry =
               new SchemaCacheEntry(
@@ -242,7 +255,7 @@ public class DataNodeSchemaCache {
                   (MeasurementSchema) measurementPath.getMeasurementSchema(),
                   measurementPath.getTagMap(),
                   measurementPath.isUnderAlignedEntity());
-          cache.put(seriesPath, entry);
+          dualKeyCache.put(seriesPath.getDevicePath(), 
seriesPath.getMeasurement(), entry);
         }
       }
     }
@@ -251,43 +264,11 @@ public class DataNodeSchemaCache {
         entry, timeValuePair, highPriorityUpdate, latestFlushedTime);
   }
 
-  public void resetLastCache(PartialPath seriesPath) {
-    SchemaCacheEntry entry = cache.getIfPresent(seriesPath);
-    if (null == entry) {
-      return;
-    }
-
-    DataNodeLastCacheManager.resetLastCache(entry);
-  }
-
-  /**
-   * For delete timeseries meatadata cache operation
-   *
-   * @param partialPath
-   * @return
-   */
-  public void invalidate(PartialPath partialPath) {
-    resetLastCache(partialPath);
-    cache.invalidate(partialPath);
-  }
-
-  public void invalidateMatchedSchema(PartialPath pathPattern) {
-    cache
-        .asMap()
-        .forEach(
-            (k, v) -> {
-              if (pathPattern.matchFullPath(k)) {
-                cache.invalidate(k);
-              }
-            });
-  }
-
-  public long estimatedSize() {
-    return cache.estimatedSize();
+  public void invalidateAll() {
+    dualKeyCache.invalidateAll();
   }
 
   public void cleanUp() {
-    cache.invalidateAll();
-    cache.cleanUp();
+    dualKeyCache.cleanUp();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
index ad40c088e2..55387520aa 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
@@ -21,12 +21,13 @@ package org.apache.iotdb.db.metadata.cache;
 
 import 
org.apache.iotdb.db.metadata.cache.lastCache.container.ILastCacheContainer;
 import 
org.apache.iotdb.db.metadata.cache.lastCache.container.LastCacheContainer;
+import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.Map;
 
-public class SchemaCacheEntry {
+public class SchemaCacheEntry implements IMeasurementSchemaInfo {
 
   private final String storageGroup;
 
@@ -109,4 +110,19 @@ public class SchemaCacheEntry {
     // each char takes 2B in Java
     return 100 + 2 * 
schemaCacheEntry.getMeasurementSchema().getMeasurementId().length();
   }
+
+  @Override
+  public String getName() {
+    return measurementSchema.getMeasurementId();
+  }
+
+  @Override
+  public MeasurementSchema getSchema() {
+    return measurementSchema;
+  }
+
+  @Override
+  public String getAlias() {
+    return null;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCache.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCache.java
new file mode 100644
index 0000000000..9f6fe21e6a
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCache.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache;
+
+/**
+ * This interfaces defines the behaviour of a dual key cache. A dual key cache 
supports manage cache
+ * values via two keys, first key and second key. Simply, the structure is 
like fk -> sk-> value.
+ *
+ * @param <FK> The first key of cache value
+ * @param <SK> The second key of cache value
+ * @param <V> The cache value
+ */
+public interface IDualKeyCache<FK, SK, V> {
+
+  /** Get the cache value with given first key and second key. */
+  V get(FK firstKey, SK secondKey);
+
+  /**
+   * Traverse target cache values via given first key and second keys provided 
in computation and
+   * execute the defined computation logic.
+   */
+  void compute(IDualKeyCacheComputation<FK, SK, V> computation);
+
+  /** put the cache value into cache */
+  void put(FK firstKey, SK secondKey, V value);
+
+  /**
+   * Invalidate all cache values in the cache and clear related cache keys. 
The cache status and
+   * statistics won't be clear and they can still be accessed via 
cache.stats().
+   */
+  void invalidateAll();
+
+  /**
+   * Clean up all data and info of this cache, including cache keys, cache 
values and cache stats.
+   */
+  void cleanUp();
+
+  /** Return all the current cache status and statistics. */
+  IDualKeyCacheStats stats();
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCacheComputation.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCacheComputation.java
new file mode 100644
index 0000000000..f0d66f2de6
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCacheComputation.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache;
+
+/**
+ * This interfaces defines the behaviour needed be implemented and executed 
during cache value
+ * traverse.
+ *
+ * @param <FK> The first key of target cache values
+ * @param <SK> The second key of one target cache value
+ * @param <V> The cache value
+ */
+public interface IDualKeyCacheComputation<FK, SK, V> {
+
+  /** Return the first key of target cache values. */
+  FK getFirstKey();
+
+  /** Return the second key list of target cache values. */
+  SK[] getSecondKeyList();
+
+  /** Compute each target cache value. The index is the second key's position 
in second key list. */
+  void computeValue(int index, V value);
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCacheStats.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCacheStats.java
new file mode 100644
index 0000000000..469f610b77
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCacheStats.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache;
+
+/** This interface defines the status and statistics, that will be provided , 
of dual key cache. */
+public interface IDualKeyCacheStats {
+
+  /**
+   * Return the count of recorded requests, since the cache has been utilized 
after init or clean
+   * up.
+   */
+  long requestCount();
+
+  /**
+   * Return the count of recorded cache hit cases, since the cache has been 
utilized after init or
+   * clean up.
+   */
+  long hitCount();
+
+  /** Return the hit rate of recorded cases, equal hitCount() / 
requestCount(). */
+  double hitRate();
+
+  /** Return current memory usage of dual key cache. */
+  long memoryUsage();
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheEntryGroupImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheEntryGroupImpl.java
new file mode 100644
index 0000000000..81aa331ae6
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheEntryGroupImpl.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+
+public class CacheEntryGroupImpl<FK, SK, V, T extends ICacheEntry<SK, V>>
+    implements ICacheEntryGroup<FK, SK, V, T> {
+
+  private final FK firstKey;
+
+  private final Map<SK, T> cacheEntryMap = new ConcurrentHashMap<>();
+
+  CacheEntryGroupImpl(FK firstKey) {
+    this.firstKey = firstKey;
+  }
+
+  @Override
+  public FK getFirstKey() {
+    return firstKey;
+  }
+
+  @Override
+  public T getCacheEntry(SK secondKey) {
+    return cacheEntryMap.get(secondKey);
+  }
+
+  @Override
+  public T computeCacheEntry(SK secondKey, BiFunction<SK, T, T> computation) {
+    return cacheEntryMap.compute(secondKey, computation);
+  }
+
+  @Override
+  public T removeCacheEntry(SK secondKey) {
+    return cacheEntryMap.remove(secondKey);
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return cacheEntryMap.isEmpty();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    CacheEntryGroupImpl<?, ?, ?, ?> that = (CacheEntryGroupImpl<?, ?, ?, ?>) o;
+    return Objects.equals(firstKey, that.firstKey);
+  }
+
+  @Override
+  public int hashCode() {
+    return firstKey.hashCode();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheSizeComputerImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheSizeComputerImpl.java
new file mode 100644
index 0000000000..dda4245972
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheSizeComputerImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+import java.util.function.Function;
+
+class CacheSizeComputerImpl<FK, SK, V> implements ICacheSizeComputer<FK, SK, 
V> {
+
+  private final Function<FK, Integer> firstKeySizeComputer;
+
+  private final Function<SK, Integer> secondKeySizeComputer;
+
+  private final Function<V, Integer> valueSizeComputer;
+
+  CacheSizeComputerImpl(
+      Function<FK, Integer> firstKeySizeComputer,
+      Function<SK, Integer> secondKeySizeComputer,
+      Function<V, Integer> valueSizeCompute) {
+    this.firstKeySizeComputer = firstKeySizeComputer;
+    this.secondKeySizeComputer = secondKeySizeComputer;
+    this.valueSizeComputer = valueSizeCompute;
+  }
+
+  @Override
+  public int computeFirstKeySize(FK firstKey) {
+    return firstKeySizeComputer.apply(firstKey);
+  }
+
+  @Override
+  public int computeSecondKeySize(SK secondKey) {
+    return secondKeySizeComputer.apply(secondKey);
+  }
+
+  @Override
+  public int computeValueSize(V value) {
+    return valueSizeComputer.apply(value);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheStats.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheStats.java
new file mode 100644
index 0000000000..8ab314a86f
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheStats.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+import org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCacheStats;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+class CacheStats implements IDualKeyCacheStats {
+
+  // prepare some buffer for high load scenarios
+  private static final double MEMORY_THRESHOLD_RATIO = 0.8;
+
+  private final long memoryCapacity;
+  private final long memoryThreshold;
+
+  private final AtomicLong memoryUsage = new AtomicLong(0);
+
+  private final AtomicLong requestCount = new AtomicLong(0);
+  private final AtomicLong hitCount = new AtomicLong(0);
+
+  CacheStats(long memoryCapacity) {
+    this.memoryCapacity = memoryCapacity;
+    this.memoryThreshold = (long) (memoryCapacity * MEMORY_THRESHOLD_RATIO);
+  }
+
+  void increaseMemoryUsage(int size) {
+    memoryUsage.getAndAdd(size);
+  }
+
+  void decreaseMemoryUsage(int size) {
+    memoryUsage.getAndAdd(-size);
+  }
+
+  boolean isExceedMemoryCapacity() {
+    return memoryUsage.get() > memoryThreshold;
+  }
+
+  void recordHit(int num) {
+    if (requestCount.get() < 0) {
+      requestCount.set(0);
+      hitCount.set(0);
+    }
+    requestCount.getAndAdd(num);
+    hitCount.getAndAdd(num);
+  }
+
+  void recordMiss(int num) {
+    if (requestCount.get() < 0) {
+      requestCount.set(0);
+      hitCount.set(0);
+    }
+    requestCount.getAndAdd(num);
+  }
+
+  @Override
+  public long requestCount() {
+    return requestCount.get();
+  }
+
+  @Override
+  public long hitCount() {
+    return hitCount.get();
+  }
+
+  @Override
+  public double hitRate() {
+    long hitCount = this.hitCount.get();
+    if (hitCount == 0) {
+      return 0;
+    }
+    long requestCount = this.requestCount.get();
+    if (requestCount == 0) {
+      return 0;
+    }
+    return hitCount * 1.0 / requestCount;
+  }
+
+  @Override
+  public long memoryUsage() {
+    return memoryUsage.get();
+  }
+
+  void reset() {
+    resetMemoryUsage();
+    hitCount.set(0);
+    requestCount.set(0);
+  }
+
+  void resetMemoryUsage() {
+    memoryUsage.set(0);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheBuilder.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheBuilder.java
new file mode 100644
index 0000000000..d26cc8ac68
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheBuilder.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+import org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCache;
+
+import java.util.function.Function;
+
+/**
+ * This class defines and implements the behaviour needed for building a dual 
key cache.
+ *
+ * @param <FK> The first key of cache value
+ * @param <SK> The second key of cache value
+ * @param <V> The cache value
+ */
+public class DualKeyCacheBuilder<FK, SK, V> {
+
+  private LRUCacheEntryManager<FK, SK, V> cacheEntryManager;
+
+  private long memoryCapacity;
+
+  private Function<FK, Integer> firstKeySizeComputer;
+
+  private Function<SK, Integer> secondKeySizeComputer;
+
+  private Function<V, Integer> valueSizeComputer;
+
+  /** Initiate and return a dual key cache instance. */
+  public IDualKeyCache<FK, SK, V> build() {
+    return new DualKeyCacheImpl<>(
+        cacheEntryManager,
+        new CacheSizeComputerImpl<>(firstKeySizeComputer, 
secondKeySizeComputer, valueSizeComputer),
+        memoryCapacity);
+  }
+
+  /** Define the cache eviction policy of dual key cache. */
+  public DualKeyCacheBuilder<FK, SK, V> cacheEvictionPolicy(DualKeyCachePolicy 
policy) {
+    if (policy == DualKeyCachePolicy.LRU) {
+      this.cacheEntryManager = new LRUCacheEntryManager<>();
+      return this;
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Define the memory capacity of dual key cache. */
+  public DualKeyCacheBuilder<FK, SK, V> memoryCapacity(long memoryCapacity) {
+    this.memoryCapacity = memoryCapacity;
+    return this;
+  }
+
+  /** Define how to compute the memory usage of a first key in dual key cache. 
*/
+  public DualKeyCacheBuilder<FK, SK, V> firstKeySizeComputer(Function<FK, 
Integer> computer) {
+    this.firstKeySizeComputer = computer;
+    return this;
+  }
+
+  /** Define how to compute the memory usage of a second key in dual key 
cache. */
+  public DualKeyCacheBuilder<FK, SK, V> secondKeySizeComputer(Function<SK, 
Integer> computer) {
+    this.secondKeySizeComputer = computer;
+    return this;
+  }
+
+  /** Define how to compute the memory usage of a cache value in dual key 
cache. */
+  public DualKeyCacheBuilder<FK, SK, V> valueSizeComputer(Function<V, Integer> 
computer) {
+    this.valueSizeComputer = computer;
+    return this;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheImpl.java
new file mode 100644
index 0000000000..0704e75f7c
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheImpl.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+import org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCache;
+import 
org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCacheComputation;
+import org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCacheStats;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
+
+class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK, V>>
+    implements IDualKeyCache<FK, SK, V> {
+
+  private final SegmentedConcurrentHashMap<FK, ICacheEntryGroup<FK, SK, V, T>> 
firstKeyMap =
+      new SegmentedConcurrentHashMap<>();
+
+  private final ICacheEntryManager<FK, SK, V, T> cacheEntryManager;
+
+  private final ICacheSizeComputer<FK, SK, V> sizeComputer;
+
+  private final CacheStats cacheStats;
+
+  private final ReentrantReadWriteLock readWriteLock = new 
ReentrantReadWriteLock(true);
+
+  DualKeyCacheImpl(
+      ICacheEntryManager<FK, SK, V, T> cacheEntryManager,
+      ICacheSizeComputer<FK, SK, V> sizeComputer,
+      long memoryCapacity) {
+    this.cacheEntryManager = cacheEntryManager;
+    this.sizeComputer = sizeComputer;
+    this.cacheStats = new CacheStats(memoryCapacity);
+  }
+
+  @Override
+  public V get(FK firstKey, SK secondKey) {
+    readWriteLock.readLock().lock();
+    try {
+      ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup = 
firstKeyMap.get(firstKey);
+      if (cacheEntryGroup == null) {
+        cacheStats.recordMiss(1);
+        return null;
+      } else {
+        T cacheEntry = cacheEntryGroup.getCacheEntry(secondKey);
+        if (cacheEntry == null) {
+          cacheStats.recordMiss(1);
+          return null;
+        } else {
+          cacheEntryManager.access(cacheEntry);
+          cacheStats.recordHit(1);
+          return cacheEntry.getValue();
+        }
+      }
+    } finally {
+      readWriteLock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void compute(IDualKeyCacheComputation<FK, SK, V> computation) {
+    readWriteLock.readLock().lock();
+    try {
+      FK firstKey = computation.getFirstKey();
+      ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup = 
firstKeyMap.get(firstKey);
+      SK[] secondKeyList = computation.getSecondKeyList();
+      if (cacheEntryGroup == null) {
+        for (int i = 0; i < secondKeyList.length; i++) {
+          computation.computeValue(i, null);
+        }
+        cacheStats.recordMiss(secondKeyList.length);
+      } else {
+        T cacheEntry;
+        int hitCount = 0;
+        for (int i = 0; i < secondKeyList.length; i++) {
+          cacheEntry = cacheEntryGroup.getCacheEntry(secondKeyList[i]);
+          if (cacheEntry == null) {
+            computation.computeValue(i, null);
+          } else {
+            computation.computeValue(i, cacheEntry.getValue());
+            cacheEntryManager.access(cacheEntry);
+            hitCount++;
+          }
+        }
+        cacheStats.recordHit(hitCount);
+        cacheStats.recordMiss(secondKeyList.length - hitCount);
+      }
+    } finally {
+      readWriteLock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void put(FK firstKey, SK secondKey, V value) {
+    readWriteLock.readLock().lock();
+    try {
+      int usedMemorySize = putToCache(firstKey, secondKey, value);
+      cacheStats.increaseMemoryUsage(usedMemorySize);
+      if (cacheStats.isExceedMemoryCapacity()) {
+        executeCacheEviction(usedMemorySize);
+      }
+    } finally {
+      readWriteLock.readLock().unlock();
+    }
+  }
+
+  private int putToCache(FK firstKey, SK secondKey, V value) {
+    AtomicInteger usedMemorySize = new AtomicInteger(0);
+    firstKeyMap.compute(
+        firstKey,
+        (k, cacheEntryGroup) -> {
+          if (cacheEntryGroup == null) {
+            cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey);
+            
usedMemorySize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
+          }
+          ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup = 
cacheEntryGroup;
+          cacheEntryGroup.computeCacheEntry(
+              secondKey,
+              (sk, cacheEntry) -> {
+                if (cacheEntry == null) {
+                  cacheEntry =
+                      cacheEntryManager.createCacheEntry(secondKey, value, 
finalCacheEntryGroup);
+                  cacheEntryManager.put(cacheEntry);
+                  
usedMemorySize.getAndAdd(sizeComputer.computeSecondKeySize(sk));
+                } else {
+                  V existingValue = cacheEntry.getValue();
+                  if (existingValue != value && !existingValue.equals(value)) {
+                    cacheEntry.replaceValue(value);
+                    
usedMemorySize.getAndAdd(-sizeComputer.computeValueSize(existingValue));
+                  }
+                  // update the cache status
+                  cacheEntryManager.access(cacheEntry);
+                }
+                usedMemorySize.getAndAdd(sizeComputer.computeValueSize(value));
+                return cacheEntry;
+              });
+          return cacheEntryGroup;
+        });
+    return usedMemorySize.get();
+  }
+
+  /**
+   * Each thread putting new cache value only needs to evict cache values, 
total memory equals that
+   * the new cache value occupied.
+   */
+  private void executeCacheEviction(int targetSize) {
+    int evictedSize;
+    while (targetSize > 0 && cacheStats.memoryUsage() > 0) {
+      evictedSize = evictOneCacheEntry();
+      cacheStats.decreaseMemoryUsage(evictedSize);
+      targetSize -= evictedSize;
+    }
+  }
+
+  private int evictOneCacheEntry() {
+
+    ICacheEntry<SK, V> evictCacheEntry = cacheEntryManager.evict();
+    if (evictCacheEntry == null) {
+      return 0;
+    }
+    AtomicInteger evictedSize = new AtomicInteger(0);
+    
evictedSize.getAndAdd(sizeComputer.computeValueSize(evictCacheEntry.getValue()));
+
+    ICacheEntryGroup<FK, SK, V, T> belongedGroup = 
evictCacheEntry.getBelongedGroup();
+    belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
+    
evictedSize.getAndAdd(sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey()));
+
+    if (belongedGroup.isEmpty()) {
+      firstKeyMap.compute(
+          belongedGroup.getFirstKey(),
+          (firstKey, cacheEntryGroup) -> {
+            if (cacheEntryGroup == null) {
+              // has been removed by other threads
+              return null;
+            }
+            if (cacheEntryGroup.isEmpty()) {
+              
evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
+              return null;
+            }
+
+            // some other thread has put value to it
+            return cacheEntryGroup;
+          });
+    }
+    return evictedSize.get();
+  }
+
+  @Override
+  public void invalidateAll() {
+    readWriteLock.writeLock().lock();
+    try {
+      executeInvalidateAll();
+    } finally {
+      readWriteLock.writeLock().unlock();
+    }
+  }
+
+  private void executeInvalidateAll() {
+    firstKeyMap.clear();
+    cacheEntryManager.cleanUp();
+    cacheStats.resetMemoryUsage();
+  }
+
+  @Override
+  public void cleanUp() {
+    readWriteLock.writeLock().lock();
+    try {
+      executeInvalidateAll();
+      cacheStats.reset();
+    } finally {
+      readWriteLock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public IDualKeyCacheStats stats() {
+    return cacheStats;
+  }
+
+  /**
+   * Since the capacity of one instance of ConcurrentHashMap is about 4 
million, a number of
+   * instances are united for more capacity.
+   */
+  private static class SegmentedConcurrentHashMap<K, V> {
+
+    private static final int SLOT_NUM = 31;
+
+    private final Map<K, V>[] maps = new ConcurrentHashMap[SLOT_NUM];
+
+    V get(K key) {
+      return getBelongedMap(key).get(key);
+    }
+
+    V compute(K key, BiFunction<? super K, ? super V, ? extends V> 
remappingFunction) {
+      return getBelongedMap(key).compute(key, remappingFunction);
+    }
+
+    void clear() {
+      synchronized (maps) {
+        for (int i = 0; i < SLOT_NUM; i++) {
+          maps[i] = null;
+        }
+      }
+    }
+
+    Map<K, V> getBelongedMap(K key) {
+      int slotIndex = key.hashCode() % SLOT_NUM;
+      slotIndex = slotIndex < 0 ? slotIndex + SLOT_NUM : slotIndex;
+      Map<K, V> map = maps[slotIndex];
+      if (map == null) {
+        synchronized (maps) {
+          map = maps[slotIndex];
+          if (map == null) {
+            map = new ConcurrentHashMap<>();
+            maps[slotIndex] = map;
+          }
+        }
+      }
+      return map;
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCachePolicy.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCachePolicy.java
new file mode 100644
index 0000000000..0211a98231
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCachePolicy.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+public enum DualKeyCachePolicy {
+  LRU;
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntry.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntry.java
new file mode 100644
index 0000000000..b9e3808935
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntry.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+/**
+ * This interface defines the behaviour of a cache entry holding the cache 
value. The cache entry is
+ * mainly accessed via second key from cache entry group and managed by cache 
entry manager for
+ * cache eviction.
+ *
+ * @param <SK> The second key of cache value.
+ * @param <V> The cache value.
+ */
+interface ICacheEntry<SK, V> {
+
+  SK getSecondKey();
+
+  V getValue();
+
+  ICacheEntryGroup getBelongedGroup();
+
+  void replaceValue(V newValue);
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntryGroup.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntryGroup.java
new file mode 100644
index 0000000000..186c813dab
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntryGroup.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+import java.util.function.BiFunction;
+
+/**
+ * This interface defines the behaviour of a cache entry group, which is 
mainly accessed via first
+ * key from dual key cache and holds all the second keys and cache values 
indexed by the first key.
+ *
+ * @param <FK> The first key of cache value.
+ * @param <SK> The second key of cache value.
+ * @param <V> The cache value.
+ * @param <T> The cache entry holding cache value.
+ */
+interface ICacheEntryGroup<FK, SK, V, T extends ICacheEntry<SK, V>> {
+
+  FK getFirstKey();
+
+  T getCacheEntry(SK secondKey);
+
+  T computeCacheEntry(SK secondKey, BiFunction<SK, T, T> computation);
+
+  T removeCacheEntry(SK secondKey);
+
+  boolean isEmpty();
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntryManager.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntryManager.java
new file mode 100644
index 0000000000..d67dc2afbb
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheEntryManager.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+/**
+ * This interface defines the behaviour of a cache entry manager, which takes 
the responsibility of
+ * cache value status management and cache eviction.
+ *
+ * @param <FK> The first key of cache value.
+ * @param <SK> The second key of cache value.
+ * @param <V> The cache value.
+ * @param <T> The cache entry holding cache value.
+ */
+interface ICacheEntryManager<FK, SK, V, T extends ICacheEntry<SK, V>> {
+
+  T createCacheEntry(SK secondKey, V value, ICacheEntryGroup<FK, SK, V, T> 
cacheEntryGroup);
+
+  void access(T cacheEntry);
+
+  void put(T cacheEntry);
+
+  T evict();
+
+  void cleanUp();
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheSizeComputer.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheSizeComputer.java
new file mode 100644
index 0000000000..0187080604
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/ICacheSizeComputer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+interface ICacheSizeComputer<FK, SK, V> {
+
+  int computeFirstKeySize(FK firstKey);
+
+  int computeSecondKeySize(SK secondKey);
+
+  int computeValueSize(V value);
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/LRUCacheEntryManager.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/LRUCacheEntryManager.java
new file mode 100644
index 0000000000..e6ba42fc36
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/LRUCacheEntryManager.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+import java.util.Objects;
+import java.util.Random;
+
+/**
+ * This class implements the cache entry manager with LRU policy.
+ *
+ * @param <FK> The first key of cache value.
+ * @param <SK> The second key of cache value.
+ * @param <V> The cache value.
+ */
+class LRUCacheEntryManager<FK, SK, V>
+    implements ICacheEntryManager<FK, SK, V, 
LRUCacheEntryManager.LRUCacheEntry<SK, V>> {
+
+  private static final int SLOT_NUM = 128;
+
+  private final LRULinkedList[] lruLinkedLists = new LRULinkedList[SLOT_NUM];
+
+  private final Random idxGenerator = new Random();
+
+  @Override
+  public LRUCacheEntry<SK, V> createCacheEntry(
+      SK secondKey, V value, ICacheEntryGroup<FK, SK, V, LRUCacheEntry<SK, V>> 
cacheEntryGroup) {
+    return new LRUCacheEntry<>(secondKey, value, cacheEntryGroup);
+  }
+
+  @Override
+  public void access(LRUCacheEntry<SK, V> cacheEntry) {
+    getBelongedList(cacheEntry).moveToHead(cacheEntry);
+  }
+
+  @Override
+  public void put(LRUCacheEntry<SK, V> cacheEntry) {
+    getBelongedList(cacheEntry).add(cacheEntry);
+  }
+
+  @Override
+  public LRUCacheEntry<SK, V> evict() {
+    int startIndex = idxGenerator.nextInt(SLOT_NUM);
+    LRULinkedList lruLinkedList;
+    LRUCacheEntry<SK, V> cacheEntry;
+    for (int i = 0; i < SLOT_NUM; i++) {
+      if (startIndex == SLOT_NUM) {
+        startIndex = 0;
+      }
+      lruLinkedList = lruLinkedLists[startIndex];
+      if (lruLinkedList != null) {
+        cacheEntry = lruLinkedList.evict();
+        if (cacheEntry != null) {
+          return cacheEntry;
+        }
+      }
+      startIndex++;
+    }
+    return null;
+  }
+
+  @Override
+  public void cleanUp() {
+    synchronized (lruLinkedLists) {
+      for (int i = 0; i < SLOT_NUM; i++) {
+        lruLinkedLists[i] = null;
+      }
+    }
+  }
+
+  private LRULinkedList getBelongedList(LRUCacheEntry<SK, V> cacheEntry) {
+    int slotIndex = cacheEntry.hashCode() % SLOT_NUM;
+    slotIndex = slotIndex < 0 ? slotIndex + SLOT_NUM : slotIndex;
+    LRULinkedList lruLinkedList = lruLinkedLists[slotIndex];
+    if (lruLinkedList == null) {
+      synchronized (lruLinkedLists) {
+        lruLinkedList = lruLinkedLists[slotIndex];
+        if (lruLinkedList == null) {
+          lruLinkedList = new LRULinkedList();
+          lruLinkedLists[slotIndex] = lruLinkedList;
+        }
+      }
+    }
+    return lruLinkedList;
+  }
+
+  static class LRUCacheEntry<SK, V> implements ICacheEntry<SK, V> {
+
+    private final SK secondKey;
+    private final ICacheEntryGroup cacheEntryGroup;
+
+    private V value;
+
+    private LRUCacheEntry<SK, V> pre;
+    private LRUCacheEntry<SK, V> next;
+
+    private LRUCacheEntry(SK secondKey, V value, ICacheEntryGroup 
cacheEntryGroup) {
+      this.secondKey = secondKey;
+      this.value = value;
+      this.cacheEntryGroup = cacheEntryGroup;
+    }
+
+    @Override
+    public SK getSecondKey() {
+      return secondKey;
+    }
+
+    @Override
+    public V getValue() {
+      return value;
+    }
+
+    @Override
+    public ICacheEntryGroup getBelongedGroup() {
+      return cacheEntryGroup;
+    }
+
+    @Override
+    public void replaceValue(V newValue) {
+      this.value = newValue;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      LRUCacheEntry<?, ?> that = (LRUCacheEntry<?, ?>) o;
+      return Objects.equals(secondKey, that.secondKey)
+          && Objects.equals(cacheEntryGroup, that.cacheEntryGroup);
+    }
+
+    @Override
+    public int hashCode() {
+      return cacheEntryGroup.hashCode() * 31 + secondKey.hashCode();
+    }
+  }
+
+  private static class LRULinkedList {
+
+    private LRUCacheEntry head;
+    private LRUCacheEntry tail;
+
+    synchronized void add(LRUCacheEntry cacheEntry) {
+      if (head == null) {
+        head = cacheEntry;
+        tail = cacheEntry;
+        return;
+      }
+
+      head.pre = cacheEntry;
+      cacheEntry.next = head;
+
+      head = cacheEntry;
+    }
+
+    synchronized LRUCacheEntry evict() {
+      if (tail == null) {
+        return null;
+      }
+
+      LRUCacheEntry cacheEntry = tail;
+      tail = tail.pre;
+
+      if (tail == null) {
+        head = null;
+      } else {
+        tail.next = null;
+      }
+
+      cacheEntry.pre = null;
+
+      return cacheEntry;
+    }
+
+    synchronized void moveToHead(LRUCacheEntry cacheEntry) {
+      if (head == null) {
+        // this cache entry has been evicted and the cache list is empty
+        return;
+      }
+
+      if (cacheEntry.pre == null) {
+        // this entry is head
+        return;
+      }
+
+      cacheEntry.pre.next = cacheEntry.next;
+
+      if (cacheEntry.next != null) {
+        cacheEntry.next.pre = cacheEntry.pre;
+      }
+
+      cacheEntry.pre = null;
+
+      head.pre = cacheEntry;
+      cacheEntry.next = head;
+
+      head = cacheEntry;
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 002882d930..c743d530d3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -476,6 +476,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher 
{
 
   @Override
   public void invalidAllCache() {
-    DataNodeSchemaCache.getInstance().cleanUp();
+    DataNodeSchemaCache.getInstance().invalidateAll();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 503e0f0a06..66b4c540cf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -395,7 +395,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) {
-    DataNodeSchemaCache.getInstance().cleanUp();
+    DataNodeSchemaCache.getInstance().invalidateAll();
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
@@ -477,7 +477,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     cache.takeWriteLock();
     try {
       // todo implement precise timeseries clean rather than clean all
-      cache.cleanUp();
+      cache.invalidateAll();
     } finally {
       cache.releaseWriteLock();
     }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
 
b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
index 88148499a6..c5504f9c25 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
@@ -88,7 +88,6 @@ public class DataNodeSchemaCacheTest {
         TSDataType.BOOLEAN,
         schemaCacheEntryMap.get(new 
PartialPath("root.sg1.d1.s3")).getTsDataType());
     Assert.assertNull(schemaCacheEntryMap.get(new 
PartialPath("root.sg1.d1.s3")).getTagMap());
-    Assert.assertEquals(3, dataNodeSchemaCache.estimatedSize());
 
     String[] otherMeasurements = new String[3];
     otherMeasurements[0] = "s3";
@@ -120,7 +119,6 @@ public class DataNodeSchemaCacheTest {
         TSDataType.INT64,
         schemaCacheEntryMap.get(new 
PartialPath("root.sg1.d1.s5")).getTsDataType());
     Assert.assertNull(schemaCacheEntryMap.get(new 
PartialPath("root.sg1.d1.s4")).getTagMap());
-    Assert.assertEquals(5, dataNodeSchemaCache.estimatedSize());
   }
 
   @Test
@@ -182,12 +180,6 @@ public class DataNodeSchemaCacheTest {
     Assert.assertEquals(value3, cachedTimeValuePair3.getValue());
     Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
     Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
-
-    // invalid cache
-    dataNodeSchemaCache.invalidate(seriesPath1);
-    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath1));
-    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
-    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
   }
 
   private ISchemaTree generateSchemaTree1() throws IllegalPathException {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
 
b/server/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
new file mode 100644
index 0000000000..7184124927
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache;
+
+import 
org.apache.iotdb.db.metadata.cache.dualkeycache.impl.DualKeyCacheBuilder;
+import org.apache.iotdb.db.metadata.cache.dualkeycache.impl.DualKeyCachePolicy;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DualKeyCacheTest {
+
+  @Test
+  public void testBasicReadPut() {
+    DualKeyCacheBuilder<String, String, String> dualKeyCacheBuilder = new 
DualKeyCacheBuilder<>();
+    IDualKeyCache<String, String, String> dualKeyCache =
+        dualKeyCacheBuilder
+            .cacheEvictionPolicy(DualKeyCachePolicy.LRU)
+            .memoryCapacity(300)
+            .firstKeySizeComputer(this::computeStringSize)
+            .secondKeySizeComputer(this::computeStringSize)
+            .valueSizeComputer(this::computeStringSize)
+            .build();
+
+    String[] firstKeyList = new String[] {"root.db.d1", "root.db.d2"};
+    String[] secondKeyList = new String[] {"s1", "s2"};
+    String[][] valueTable =
+        new String[][] {new String[] {"1-1", "1-2"}, new String[] {"2-1", 
"2-2"}};
+
+    for (int i = 0; i < firstKeyList.length; i++) {
+      for (int j = 0; j < secondKeyList.length; j++) {
+        dualKeyCache.put(firstKeyList[i], secondKeyList[j], valueTable[i][j]);
+      }
+    }
+
+    int firstKeyOfMissingEntry = -1, secondKeyOfMissingEntry = -1;
+    for (int i = 0; i < firstKeyList.length; i++) {
+      for (int j = 0; j < secondKeyList.length; j++) {
+        String value = dualKeyCache.get(firstKeyList[i], secondKeyList[j]);
+        if (value == null) {
+          if (firstKeyOfMissingEntry == -1) {
+            firstKeyOfMissingEntry = i;
+            secondKeyOfMissingEntry = j;
+          } else {
+            Assert.fail();
+          }
+        } else {
+          Assert.assertEquals(valueTable[i][j], value);
+        }
+      }
+    }
+
+    Assert.assertEquals(230, dualKeyCache.stats().memoryUsage());
+    Assert.assertEquals(4, dualKeyCache.stats().requestCount());
+    Assert.assertEquals(3, dualKeyCache.stats().hitCount());
+
+    dualKeyCache.put(
+        firstKeyList[firstKeyOfMissingEntry],
+        secondKeyList[secondKeyOfMissingEntry],
+        valueTable[firstKeyOfMissingEntry][secondKeyOfMissingEntry]);
+    Assert.assertEquals(230, dualKeyCache.stats().memoryUsage());
+
+    for (int i = 0; i < firstKeyList.length; i++) {
+      int finalI = i;
+      dualKeyCache.compute(
+          new IDualKeyCacheComputation<String, String, String>() {
+            @Override
+            public String getFirstKey() {
+              return firstKeyList[finalI];
+            }
+
+            @Override
+            public String[] getSecondKeyList() {
+              return secondKeyList;
+            }
+
+            @Override
+            public void computeValue(int index, String value) {
+              if (value != null) {
+                Assert.assertEquals(valueTable[finalI][index], value);
+              }
+            }
+          });
+    }
+
+    Assert.assertEquals(8, dualKeyCache.stats().requestCount());
+    Assert.assertEquals(6, dualKeyCache.stats().hitCount());
+  }
+
+  private int computeStringSize(String string) {
+    return 8 + 8 + 4 + 2 * string.length();
+  }
+}

Reply via email to