This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a7e114a5b6 Ensure concurrency safety when updating last cache
9a7e114a5b6 is described below

commit 9a7e114a5b62412cd3384f147f5c5224a30676c8
Author: Chen YZ <[email protected]>
AuthorDate: Wed Sep 27 08:21:09 2023 +0800

    Ensure concurrency safety when updating last cache
---
 .../analyze/cache/schema/DataNodeSchemaCache.java  | 43 +++++++++-----
 .../analyze/cache/schema/SchemaCacheEntry.java     | 16 +++++-
 .../cache/schema/TimeSeriesSchemaCache.java        | 62 +++++++++++++++------
 .../schema/dualkeycache/impl/DualKeyCacheImpl.java | 65 +++++++++++++---------
 .../dualkeycache/impl/FIFOCacheEntryManager.java   |  7 ++-
 .../schema/dualkeycache/impl/ICacheEntry.java      |  2 +
 .../dualkeycache/impl/LRUCacheEntryManager.java    |  7 ++-
 .../schema/lastcache/DataNodeLastCacheManager.java |  6 +-
 .../cache/dualkeycache/DualKeyCacheTest.java       |  4 +-
 9 files changed, 143 insertions(+), 69 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
index 90cebfee2bf..7b50491ebc0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
@@ -42,7 +43,8 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.IntPredicate;
 
 /**
  * This class takes the responsibility of metadata cache management of all 
DataRegions under
@@ -191,6 +193,7 @@ public class DataNodeSchemaCache {
   }
 
   /** get SchemaCacheEntry and update last cache */
+  @TestOnly
   public void updateLastCache(
       PartialPath devicePath,
       String measurement,
@@ -207,20 +210,25 @@ public class DataNodeSchemaCache {
       String[] measurements,
       MeasurementSchema[] measurementSchemas,
       boolean isAligned,
-      Function<Integer, TimeValuePair> timeValuePairProvider,
-      Function<Integer, Boolean> shouldUpdateProvider,
+      IntFunction<TimeValuePair> timeValuePairProvider,
+      IntPredicate shouldUpdateProvider,
       boolean highPriorityUpdate,
       Long latestFlushedTime) {
-    timeSeriesSchemaCache.updateLastCache(
-        database,
-        devicePath,
-        measurements,
-        measurementSchemas,
-        isAligned,
-        timeValuePairProvider,
-        shouldUpdateProvider,
-        highPriorityUpdate,
-        latestFlushedTime);
+    takeReadLock();
+    try {
+      timeSeriesSchemaCache.updateLastCache(
+          database,
+          devicePath,
+          measurements,
+          measurementSchemas,
+          isAligned,
+          timeValuePairProvider,
+          shouldUpdateProvider,
+          highPriorityUpdate,
+          latestFlushedTime);
+    } finally {
+      releaseReadLock();
+    }
   }
 
   /**
@@ -233,8 +241,13 @@ public class DataNodeSchemaCache {
       TimeValuePair timeValuePair,
       boolean highPriorityUpdate,
       Long latestFlushedTime) {
-    timeSeriesSchemaCache.updateLastCache(
-        storageGroup, measurementPath, timeValuePair, highPriorityUpdate, 
latestFlushedTime);
+    takeReadLock();
+    try {
+      timeSeriesSchemaCache.updateLastCache(
+          storageGroup, measurementPath, timeValuePair, highPriorityUpdate, 
latestFlushedTime);
+    } finally {
+      releaseReadLock();
+    }
   }
 
   public void invalidateAll() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java
index 790558c9d9f..8df16a91ba5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.lastcache.ILastCacheContainer;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.lastcache.LastCacheContainer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -76,14 +77,25 @@ public class SchemaCacheEntry implements 
IMeasurementSchemaInfo {
   }
 
   public ILastCacheContainer getLastCacheContainer() {
+    return lastCacheContainer;
+  }
+
+  public int updateLastCache(
+      TimeValuePair timeValuePair, boolean highPriorityUpdate, Long 
latestFlushedTime) {
+
     if (lastCacheContainer == null) {
       synchronized (this) {
         if (lastCacheContainer == null) {
-          lastCacheContainer = new LastCacheContainer();
+          ILastCacheContainer tmp = new LastCacheContainer();
+          int changeSize = tmp.estimateSize();
+          changeSize += tmp.updateCachedLast(timeValuePair, 
highPriorityUpdate, latestFlushedTime);
+          lastCacheContainer = tmp;
+          return changeSize;
         }
       }
     }
-    return lastCacheContainer;
+    return lastCacheContainer.updateCachedLast(
+        timeValuePair, highPriorityUpdate, latestFlushedTime);
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
index 0d3321d9d73..a1b8f508f7e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.exception.metadata.view.InsertNonWritableViewException;
@@ -46,7 +47,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.IntPredicate;
 
 public class TimeSeriesSchemaCache {
 
@@ -222,12 +224,6 @@ public class TimeSeriesSchemaCache {
     return new Pair<>(indexOfMissingMeasurements, missedPathStringList);
   }
 
-  public void put(ClusterSchemaTree schemaTree) {
-    for (MeasurementPath measurementPath : schemaTree.getAllMeasurement()) {
-      
putSingleMeasurementPath(schemaTree.getBelongedDatabase(measurementPath), 
measurementPath);
-    }
-  }
-
   public void putSingleMeasurementPath(String storageGroup, MeasurementPath 
measurementPath) {
     SchemaCacheEntry schemaCacheEntry =
         new SchemaCacheEntry(
@@ -250,6 +246,7 @@ public class TimeSeriesSchemaCache {
   }
 
   /** get SchemaCacheEntry and update last cache */
+  @TestOnly
   public void updateLastCache(
       PartialPath devicePath,
       String measurement,
@@ -272,8 +269,8 @@ public class TimeSeriesSchemaCache {
       String[] measurements,
       MeasurementSchema[] measurementSchemas,
       boolean isAligned,
-      Function<Integer, TimeValuePair> timeValuePairProvider,
-      Function<Integer, Boolean> shouldUpdateProvider,
+      IntFunction<TimeValuePair> timeValuePairProvider,
+      IntPredicate shouldUpdateProvider,
       boolean highPriorityUpdate,
       Long latestFlushedTime) {
     SchemaCacheEntry entry;
@@ -292,7 +289,7 @@ public class TimeSeriesSchemaCache {
 
           @Override
           public int updateValue(int index, SchemaCacheEntry value) {
-            if (!shouldUpdateProvider.apply(index)) {
+            if (!shouldUpdateProvider.test(index)) {
               return 0;
             }
             if (value == null) {
@@ -316,10 +313,28 @@ public class TimeSeriesSchemaCache {
           }
         }
       }
-
-      DataNodeLastCacheManager.updateLastCache(
-          entry, timeValuePairProvider.apply(index), highPriorityUpdate, 
latestFlushedTime);
     }
+    dualKeyCache.update(
+        new IDualKeyCacheUpdating<PartialPath, String, SchemaCacheEntry>() {
+          @Override
+          public PartialPath getFirstKey() {
+            return devicePath;
+          }
+
+          @Override
+          public String[] getSecondKeyList() {
+            return missingMeasurements.stream().map(i -> 
measurements[i]).toArray(String[]::new);
+          }
+
+          @Override
+          public int updateValue(int index, SchemaCacheEntry value) {
+            return DataNodeLastCacheManager.updateLastCache(
+                value,
+                timeValuePairProvider.apply(missingMeasurements.get(index)),
+                highPriorityUpdate,
+                latestFlushedTime);
+          }
+        });
   }
 
   /**
@@ -342,16 +357,31 @@ public class TimeSeriesSchemaCache {
           entry =
               new SchemaCacheEntry(
                   storageGroup,
-                  (MeasurementSchema) measurementPath.getMeasurementSchema(),
+                  measurementPath.getMeasurementSchema(),
                   measurementPath.getTagMap(),
                   measurementPath.isUnderAlignedEntity());
           dualKeyCache.put(seriesPath.getDevicePath(), 
seriesPath.getMeasurement(), entry);
         }
       }
     }
+    dualKeyCache.update(
+        new IDualKeyCacheUpdating<PartialPath, String, SchemaCacheEntry>() {
+          @Override
+          public PartialPath getFirstKey() {
+            return measurementPath.getDevicePath();
+          }
 
-    DataNodeLastCacheManager.updateLastCache(
-        entry, timeValuePair, highPriorityUpdate, latestFlushedTime);
+          @Override
+          public String[] getSecondKeyList() {
+            return new String[] {measurementPath.getMeasurement()};
+          }
+
+          @Override
+          public int updateValue(int index, SchemaCacheEntry value) {
+            return DataNodeLastCacheManager.updateLastCache(
+                value, timeValuePair, highPriorityUpdate, latestFlushedTime);
+          }
+        });
   }
 
   public void invalidateAll() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
index aed8277024f..c02a3173ccd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
@@ -115,14 +115,22 @@ class DualKeyCacheImpl<FK, SK, V, T extends 
ICacheEntry<SK, V>>
         if (cacheEntry == null) {
           updating.updateValue(i, null);
         } else {
-          int changeSize = updating.updateValue(i, cacheEntry.getValue());
-          cacheEntryManager.access(cacheEntry);
-          if (changeSize != 0) {
-            cacheStats.increaseMemoryUsage(changeSize);
-            if (cacheStats.isExceedMemoryCapacity()) {
-              executeCacheEviction(changeSize);
+          int changeSize = 0;
+          synchronized (cacheEntry) {
+            if (cacheEntry.getBelongedGroup() != null) {
+              // Only update the value when the cache entry is not evicted.
+              // If the cache entry is evicted, getBelongedGroup is null.
+              // Synchronized is to guarantee the cache entry is not evicted 
during the update.
+              changeSize = updating.updateValue(i, cacheEntry.getValue());
+              cacheEntryManager.access(cacheEntry);
+              if (changeSize != 0) {
+                cacheStats.increaseMemoryUsage(changeSize);
+              }
             }
           }
+          if (changeSize != 0 && cacheStats.isExceedMemoryCapacity()) {
+            executeCacheEviction(changeSize);
+          }
           hitCount++;
         }
       }
@@ -194,31 +202,34 @@ class DualKeyCacheImpl<FK, SK, V, T extends 
ICacheEntry<SK, V>>
     if (evictCacheEntry == null) {
       return 0;
     }
-    AtomicInteger evictedSize = new AtomicInteger(0);
-    
evictedSize.getAndAdd(sizeComputer.computeValueSize(evictCacheEntry.getValue()));
+    synchronized (evictCacheEntry) {
+      AtomicInteger evictedSize = new AtomicInteger(0);
+      
evictedSize.getAndAdd(sizeComputer.computeValueSize(evictCacheEntry.getValue()));
 
-    ICacheEntryGroup<FK, SK, V, T> belongedGroup = 
evictCacheEntry.getBelongedGroup();
-    belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
-    
evictedSize.getAndAdd(sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey()));
+      ICacheEntryGroup<FK, SK, V, T> belongedGroup = 
evictCacheEntry.getBelongedGroup();
+      evictCacheEntry.setBelongedGroup(null);
+      belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
+      
evictedSize.getAndAdd(sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey()));
 
-    if (belongedGroup.isEmpty()) {
-      firstKeyMap.compute(
-          belongedGroup.getFirstKey(),
-          (firstKey, cacheEntryGroup) -> {
-            if (cacheEntryGroup == null) {
-              // has been removed by other threads
-              return null;
-            }
-            if (cacheEntryGroup.isEmpty()) {
-              
evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
-              return null;
-            }
+      if (belongedGroup.isEmpty()) {
+        firstKeyMap.compute(
+            belongedGroup.getFirstKey(),
+            (firstKey, cacheEntryGroup) -> {
+              if (cacheEntryGroup == null) {
+                // has been removed by other threads
+                return null;
+              }
+              if (cacheEntryGroup.isEmpty()) {
+                
evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
+                return null;
+              }
 
-            // some other thread has put value to it
-            return cacheEntryGroup;
-          });
+              // some other thread has put value to it
+              return cacheEntryGroup;
+            });
+      }
+      return evictedSize.get();
     }
-    return evictedSize.get();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java
index 4d3c488d8eb..661e559dd82 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java
@@ -105,7 +105,7 @@ public class FIFOCacheEntryManager<FK, SK, V>
   static class FIFOCacheEntry<SK, V> implements ICacheEntry<SK, V> {
 
     private final SK secondKey;
-    private final ICacheEntryGroup cacheEntryGroup;
+    private volatile ICacheEntryGroup cacheEntryGroup;
 
     private V value;
 
@@ -132,6 +132,11 @@ public class FIFOCacheEntryManager<FK, SK, V>
       return cacheEntryGroup;
     }
 
+    @Override
+    public void setBelongedGroup(ICacheEntryGroup belongedGroup) {
+      this.cacheEntryGroup = belongedGroup;
+    }
+
     @Override
     public void replaceValue(V newValue) {
       this.value = newValue;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntry.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntry.java
index 87fc118c613..655b8c6e56a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntry.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntry.java
@@ -35,5 +35,7 @@ interface ICacheEntry<SK, V> {
 
   ICacheEntryGroup getBelongedGroup();
 
+  void setBelongedGroup(ICacheEntryGroup belongedGroup);
+
   void replaceValue(V newValue);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java
index e5f8ff87496..a350c7c98ce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java
@@ -103,7 +103,7 @@ class LRUCacheEntryManager<FK, SK, V>
   static class LRUCacheEntry<SK, V> implements ICacheEntry<SK, V> {
 
     private final SK secondKey;
-    private final ICacheEntryGroup cacheEntryGroup;
+    private volatile ICacheEntryGroup cacheEntryGroup;
 
     private V value;
 
@@ -131,6 +131,11 @@ class LRUCacheEntryManager<FK, SK, V>
       return cacheEntryGroup;
     }
 
+    @Override
+    public void setBelongedGroup(ICacheEntryGroup belongedGroup) {
+      this.cacheEntryGroup = belongedGroup;
+    }
+
     @Override
     public void replaceValue(V newValue) {
       this.value = newValue;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java
index 678c7e8b430..6790f2cb645 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java
@@ -43,7 +43,7 @@ public class DataNodeLastCacheManager {
       return null;
     }
     ILastCacheContainer lastCacheContainer = entry.getLastCacheContainer();
-    return lastCacheContainer.getCachedLast();
+    return lastCacheContainer == null ? null : 
lastCacheContainer.getCachedLast();
   }
 
   /**
@@ -63,8 +63,6 @@ public class DataNodeLastCacheManager {
     if (!CACHE_ENABLED || null == entry) {
       return 0;
     }
-    ILastCacheContainer lastCacheContainer = entry.getLastCacheContainer();
-    return lastCacheContainer.updateCachedLast(
-        timeValuePair, highPriorityUpdate, latestFlushedTime);
+    return entry.updateLastCache(timeValuePair, highPriorityUpdate, 
latestFlushedTime);
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
index 2db2b717a31..4cef9502f8e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
@@ -189,9 +189,7 @@ public class DualKeyCacheTest {
           }
         });
     int tmp = SchemaCacheEntry.estimateSize(schemaCacheEntry);
-    schemaCacheEntry
-        .getLastCacheContainer()
-        .updateCachedLast(new TimeValuePair(1L, new TsPrimitiveType.TsInt(1)), 
true, 0L);
+    schemaCacheEntry.updateLastCache(new TimeValuePair(1L, new 
TsPrimitiveType.TsInt(1)), true, 0L);
     expectedSize += (SchemaCacheEntry.estimateSize(schemaCacheEntry) - tmp) * 
2;
     Assert.assertEquals(expectedSize, dualKeyCache.stats().memoryUsage());
   }

Reply via email to