This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 29bc06cac [core] Manage the lookup file disk cache in task granularity 
(#3853)
29bc06cac is described below

commit 29bc06cac05c937b4c76afeb53570298139edd65
Author: Aitozi <[email protected]>
AuthorDate: Wed Jul 31 11:51:15 2024 +0800

    [core] Manage the lookup file disk cache in task granularity (#3853)
---
 .../org/apache/paimon/mergetree/LookupLevels.java  | 60 ++++++++++++++++------
 .../paimon/operation/KeyValueFileStoreWrite.java   | 23 +++++++--
 .../apache/paimon/table/query/LocalTableQuery.java | 19 +++++--
 .../paimon/mergetree/ContainsLevelsTest.java       |  5 +-
 .../apache/paimon/mergetree/LookupLevelsTest.java  |  8 +--
 5 files changed, 86 insertions(+), 29 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
index d6024ebb8..1ba31816d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
@@ -39,6 +39,7 @@ import org.apache.paimon.utils.IOFunction;
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Weigher;
 import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
 
 import javax.annotation.Nullable;
@@ -50,6 +51,8 @@ import java.io.UncheckedIOException;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.TreeSet;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -72,6 +75,7 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
     private final LookupStoreFactory lookupStoreFactory;
     private final Cache<String, LookupFile> lookupFiles;
     private final Function<Long, BloomFilter.Builder> bfGenerator;
+    private final Set<String> cachedFiles;
 
     public LookupLevels(
             Levels levels,
@@ -81,9 +85,8 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
             IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
             Supplier<File> localFileFactory,
             LookupStoreFactory lookupStoreFactory,
-            Duration fileRetention,
-            MemorySize maxDiskSize,
-            Function<Long, BloomFilter.Builder> bfGenerator) {
+            Function<Long, BloomFilter.Builder> bfGenerator,
+            Cache<String, LookupFile> lookupFiles) {
         this.levels = levels;
         this.keyComparator = keyComparator;
         this.keySerializer = new RowCompactedSerializer(keyType);
@@ -91,18 +94,23 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
         this.fileReaderFactory = fileReaderFactory;
         this.localFileFactory = localFileFactory;
         this.lookupStoreFactory = lookupStoreFactory;
-        this.lookupFiles =
-                Caffeine.newBuilder()
-                        .expireAfterAccess(fileRetention)
-                        .maximumWeight(maxDiskSize.getKibiBytes())
-                        .weigher(this::fileWeigh)
-                        .removalListener(this::removalCallback)
-                        .executor(MoreExecutors.directExecutor())
-                        .build();
         this.bfGenerator = bfGenerator;
+        this.lookupFiles = lookupFiles;
+        this.cachedFiles = new HashSet<>();
         levels.addDropFileCallback(this);
     }
 
+    public static Cache<String, LookupFile> createCache(
+            Duration fileRetention, MemorySize maxDiskSize) {
+        return Caffeine.newBuilder()
+                .expireAfterAccess(fileRetention)
+                .maximumWeight(maxDiskSize.getKibiBytes())
+                .weigher((Weigher<String, LookupFile>) LookupLevels::fileWeigh)
+                .removalListener(LookupLevels::removalCallback)
+                .executor(MoreExecutors.directExecutor())
+                .build();
+    }
+
     public Levels getLevels() {
         return levels;
     }
@@ -112,6 +120,11 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
         return lookupFiles;
     }
 
+    @VisibleForTesting
+    Set<String> cachedFiles() {
+        return cachedFiles;
+    }
+
     @Override
     public void notifyDropFile(String file) {
         lookupFiles.invalidate(file);
@@ -138,6 +151,7 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
 
         while (lookupFile == null || lookupFile.isClosed) {
             lookupFile = createLookupFile(file);
+            cachedFiles.add(file.fileName());
             lookupFiles.put(file.fileName(), lookupFile);
         }
 
@@ -151,11 +165,11 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
                 key, lookupFile.remoteFile().level(), valueBytes, 
file.fileName());
     }
 
-    private int fileWeigh(String file, LookupFile lookupFile) {
+    private static int fileWeigh(String file, LookupFile lookupFile) {
         return fileKibiBytes(lookupFile.localFile);
     }
 
-    private void removalCallback(String key, LookupFile file, RemovalCause 
cause) {
+    private static void removalCallback(String key, LookupFile file, 
RemovalCause cause) {
         if (file != null) {
             try {
                 file.close();
@@ -204,26 +218,37 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
             context = kvWriter.close();
         }
 
-        return new LookupFile(localFile, file, 
lookupStoreFactory.createReader(localFile, context));
+        return new LookupFile(
+                localFile, file, lookupStoreFactory.createReader(localFile, 
context), cachedFiles);
     }
 
     @Override
     public void close() throws IOException {
-        lookupFiles.invalidateAll();
+        Set<String> toClean = new HashSet<>(cachedFiles);
+        for (String cachedFile : toClean) {
+            lookupFiles.invalidate(cachedFile);
+        }
     }
 
-    private static class LookupFile implements Closeable {
+    /** Lookup file. */
+    public static class LookupFile implements Closeable {
 
         private final File localFile;
         private final DataFileMeta remoteFile;
         private final LookupStoreReader reader;
+        private final Set<String> cachedFiles;
 
         private boolean isClosed = false;
 
-        public LookupFile(File localFile, DataFileMeta remoteFile, 
LookupStoreReader reader) {
+        public LookupFile(
+                File localFile,
+                DataFileMeta remoteFile,
+                LookupStoreReader reader,
+                Set<String> cachedFiles) {
             this.localFile = localFile;
             this.remoteFile = remoteFile;
             this.reader = reader;
+            this.cachedFiles = cachedFiles;
         }
 
         @Nullable
@@ -240,6 +265,7 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
         public void close() throws IOException {
             reader.close();
             isClosed = true;
+            cachedFiles.remove(remoteFile.fileName());
             FileIOUtils.deleteFileOrDirectory(localFile);
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index df3242141..6a5b30aea 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -71,6 +71,8 @@ import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.UserDefinedSeqComparator;
 
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,6 +104,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
     private final RowType keyType;
     private final RowType valueType;
     @Nullable private final RecordLevelExpire recordLevelExpire;
+    private Cache<String, LookupLevels.LookupFile> lookupFileCache;
 
     public KeyValueFileStoreWrite(
             FileIO fileIO,
@@ -356,6 +359,13 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         cacheManager,
                         new 
RowCompactedSerializer(keyType).createSliceComparator());
         Options options = this.options.toConfiguration();
+        if (lookupFileCache == null) {
+            lookupFileCache =
+                    LookupLevels.createCache(
+                            
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
+                            
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
+        }
+
         return new LookupLevels<>(
                 levels,
                 keyComparatorSupplier.get(),
@@ -364,8 +374,15 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 readerFactory::createRecordReader,
                 () -> ioManager.createChannel().getPathFile(),
                 lookupStoreFactory,
-                options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
-                options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE),
-                bfGenerator(options));
+                bfGenerator(options),
+                lookupFileCache);
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (lookupFileCache != null) {
+            lookupFileCache.invalidateAll();
+        }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 9181fdb16..7475d6e8e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -41,6 +41,8 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.KeyComparatorSupplier;
 import org.apache.paimon.utils.Preconditions;
 
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -70,6 +72,8 @@ public class LocalTableQuery implements TableQuery {
 
     private IOManager ioManager;
 
+    private Cache<String, LookupLevels.LookupFile> lookupFileCache;
+
     public LocalTableQuery(FileStoreTable table) {
         this.options = table.coreOptions();
         this.tableView = new HashMap<>();
@@ -130,6 +134,13 @@ public class LocalTableQuery implements TableQuery {
         KeyValueFileReaderFactory factory =
                 readerFactoryBuilder.build(partition, bucket, 
DeletionVector.emptyFactory());
         Options options = this.options.toConfiguration();
+        if (lookupFileCache == null) {
+            lookupFileCache =
+                    LookupLevels.createCache(
+                            
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
+                            
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
+        }
+
         LookupLevels<KeyValue> lookupLevels =
                 new LookupLevels<>(
                         levels,
@@ -148,9 +159,8 @@ public class LocalTableQuery implements TableQuery {
                                         .createChannel()
                                         .getPathFile(),
                         lookupStoreFactory,
-                        options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
-                        options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE),
-                        bfGenerator(options));
+                        bfGenerator(options),
+                        lookupFileCache);
 
         tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket, 
lookupLevels);
     }
@@ -202,6 +212,9 @@ public class LocalTableQuery implements TableQuery {
                 bucket.getValue().close();
             }
         }
+        if (lookupFileCache != null) {
+            lookupFileCache.invalidateAll();
+        }
         tableView.clear();
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 781bab0df..07972b28a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -195,9 +195,8 @@ public class ContainsLevelsTest {
                 () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
                 new HashLookupStoreFactory(
                         new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 
0.75, "none"),
-                Duration.ofHours(1),
-                maxDiskSize,
-                rowCount -> BloomFilter.builder(rowCount, 0.01));
+                rowCount -> BloomFilter.builder(rowCount, 0.01),
+                LookupLevels.createCache(Duration.ofHours(1), maxDiskSize));
     }
 
     private KeyValue kv(int key, int value) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 7b89f409d..a5d58cf13 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -193,6 +193,8 @@ public class LookupLevelsTest {
             assertThat(kv.level()).isEqualTo(1);
             assertThat(kv.value().getInt(1)).isEqualTo(i);
         }
+        assertThat(lookupLevels.lookupFiles().asMap().keySet())
+                .isEqualTo(lookupLevels.cachedFiles());
 
         // some files are invalided
         long fileNumber = lookupLevels.lookupFiles().estimatedSize();
@@ -202,6 +204,7 @@ public class LookupLevelsTest {
         
assertThat(fileNumber).isNotEqualTo(fileNum).isEqualTo(lookupFiles.length);
 
         lookupLevels.close();
+        assertThat(lookupLevels.cachedFiles()).isEmpty();
         assertThat(lookupLevels.lookupFiles().estimatedSize()).isEqualTo(0);
     }
 
@@ -271,9 +274,8 @@ public class LookupLevelsTest {
                 () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
                 new HashLookupStoreFactory(
                         new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 
0.75, "none"),
-                Duration.ofHours(1),
-                maxDiskSize,
-                rowCount -> BloomFilter.builder(rowCount, 0.05));
+                rowCount -> BloomFilter.builder(rowCount, 0.05),
+                LookupLevels.createCache(Duration.ofHours(1), maxDiskSize));
     }
 
     private KeyValue kv(int key, int value) {

Reply via email to