This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d66fc112 [core] Extract LookupFile from LookupLevels
6d66fc112 is described below

commit 6d66fc112851e332daadca2e9db296e747f51f41
Author: Jingsong <[email protected]>
AuthorDate: Wed Jul 31 12:07:46 2024 +0800

    [core] Extract LookupFile from LookupLevels
---
 .../org/apache/paimon/mergetree/LookupFile.java    | 108 ++++++++++++++++++++
 .../org/apache/paimon/mergetree/LookupLevels.java  | 109 ++++-----------------
 .../paimon/operation/KeyValueFileStoreWrite.java   |   5 +-
 .../apache/paimon/table/query/LocalTableQuery.java |   5 +-
 .../paimon/mergetree/ContainsLevelsTest.java       |   2 +-
 .../apache/paimon/mergetree/LookupLevelsTest.java  |   2 +-
 6 files changed, 135 insertions(+), 96 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
new file mode 100644
index 000000000..429ede810
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree;
+
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.lookup.LookupStoreReader;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.utils.FileIOUtils;
+
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
+import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+
+import static org.apache.paimon.mergetree.LookupUtils.fileKibiBytes;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Lookup file for cache remote file to local. */
+public class LookupFile implements Closeable {
+
+    private final File localFile;
+    private final DataFileMeta remoteFile;
+    private final LookupStoreReader reader;
+    private final Runnable callback;
+
+    private boolean isClosed = false;
+
+    public LookupFile(
+            File localFile, DataFileMeta remoteFile, LookupStoreReader reader, 
Runnable callback) {
+        this.localFile = localFile;
+        this.remoteFile = remoteFile;
+        this.reader = reader;
+        this.callback = callback;
+    }
+
+    @Nullable
+    public byte[] get(byte[] key) throws IOException {
+        checkArgument(!isClosed);
+        return reader.lookup(key);
+    }
+
+    public DataFileMeta remoteFile() {
+        return remoteFile;
+    }
+
+    public boolean isClosed() {
+        return isClosed;
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+        isClosed = true;
+        callback.run();
+        FileIOUtils.deleteFileOrDirectory(localFile);
+    }
+
+    // ==================== Cache for Local File ======================
+
+    public static Cache<String, LookupFile> createCache(
+            Duration fileRetention, MemorySize maxDiskSize) {
+        return Caffeine.newBuilder()
+                .expireAfterAccess(fileRetention)
+                .maximumWeight(maxDiskSize.getKibiBytes())
+                .weigher(LookupFile::fileWeigh)
+                .removalListener(LookupFile::removalCallback)
+                .executor(MoreExecutors.directExecutor())
+                .build();
+    }
+
+    private static int fileWeigh(String file, LookupFile lookupFile) {
+        return fileKibiBytes(lookupFile.localFile);
+    }
+
+    private static void removalCallback(String file, LookupFile lookupFile, 
RemovalCause cause) {
+        if (lookupFile != null) {
+            try {
+                lookupFile.close();
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
index 1ba31816d..6cb3bc61b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
@@ -24,10 +24,8 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.RowCompactedSerializer;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.lookup.LookupStoreFactory;
-import org.apache.paimon.lookup.LookupStoreReader;
 import org.apache.paimon.lookup.LookupStoreWriter;
 import org.apache.paimon.memory.MemorySegment;
-import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.reader.FileRecordIterator;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.types.RowKind;
@@ -37,18 +35,12 @@ import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.IOFunction;
 
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
-import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
-import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
-import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Weigher;
-import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
 
 import javax.annotation.Nullable;
 
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -57,8 +49,6 @@ import java.util.TreeSet;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
-import static org.apache.paimon.mergetree.LookupUtils.fileKibiBytes;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.VarLengthIntUtils.MAX_VAR_LONG_SIZE;
 import static org.apache.paimon.utils.VarLengthIntUtils.decodeLong;
 import static org.apache.paimon.utils.VarLengthIntUtils.encodeLong;
@@ -73,9 +63,10 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
     private final IOFunction<DataFileMeta, RecordReader<KeyValue>> 
fileReaderFactory;
     private final Supplier<File> localFileFactory;
     private final LookupStoreFactory lookupStoreFactory;
-    private final Cache<String, LookupFile> lookupFiles;
     private final Function<Long, BloomFilter.Builder> bfGenerator;
-    private final Set<String> cachedFiles;
+
+    private final Cache<String, LookupFile> lookupFileCache;
+    private final Set<String> ownCachedFiles;
 
     public LookupLevels(
             Levels levels,
@@ -86,7 +77,7 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
             Supplier<File> localFileFactory,
             LookupStoreFactory lookupStoreFactory,
             Function<Long, BloomFilter.Builder> bfGenerator,
-            Cache<String, LookupFile> lookupFiles) {
+            Cache<String, LookupFile> lookupFileCache) {
         this.levels = levels;
         this.keyComparator = keyComparator;
         this.keySerializer = new RowCompactedSerializer(keyType);
@@ -95,39 +86,28 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
         this.localFileFactory = localFileFactory;
         this.lookupStoreFactory = lookupStoreFactory;
         this.bfGenerator = bfGenerator;
-        this.lookupFiles = lookupFiles;
-        this.cachedFiles = new HashSet<>();
+        this.lookupFileCache = lookupFileCache;
+        this.ownCachedFiles = new HashSet<>();
         levels.addDropFileCallback(this);
     }
 
-    public static Cache<String, LookupFile> createCache(
-            Duration fileRetention, MemorySize maxDiskSize) {
-        return Caffeine.newBuilder()
-                .expireAfterAccess(fileRetention)
-                .maximumWeight(maxDiskSize.getKibiBytes())
-                .weigher((Weigher<String, LookupFile>) LookupLevels::fileWeigh)
-                .removalListener(LookupLevels::removalCallback)
-                .executor(MoreExecutors.directExecutor())
-                .build();
-    }
-
     public Levels getLevels() {
         return levels;
     }
 
     @VisibleForTesting
     Cache<String, LookupFile> lookupFiles() {
-        return lookupFiles;
+        return lookupFileCache;
     }
 
     @VisibleForTesting
     Set<String> cachedFiles() {
-        return cachedFiles;
+        return ownCachedFiles;
     }
 
     @Override
     public void notifyDropFile(String file) {
-        lookupFiles.invalidate(file);
+        lookupFileCache.invalidate(file);
     }
 
     @Nullable
@@ -147,12 +127,11 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
 
     @Nullable
     private T lookup(InternalRow key, DataFileMeta file) throws IOException {
-        LookupFile lookupFile = lookupFiles.getIfPresent(file.fileName());
+        LookupFile lookupFile = lookupFileCache.getIfPresent(file.fileName());
 
-        while (lookupFile == null || lookupFile.isClosed) {
+        while (lookupFile == null || lookupFile.isClosed()) {
             lookupFile = createLookupFile(file);
-            cachedFiles.add(file.fileName());
-            lookupFiles.put(file.fileName(), lookupFile);
+            lookupFileCache.put(file.fileName(), lookupFile);
         }
 
         byte[] keyBytes = keySerializer.serializeToBytes(key);
@@ -165,20 +144,6 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
                 key, lookupFile.remoteFile().level(), valueBytes, 
file.fileName());
     }
 
-    private static int fileWeigh(String file, LookupFile lookupFile) {
-        return fileKibiBytes(lookupFile.localFile);
-    }
-
-    private static void removalCallback(String key, LookupFile file, 
RemovalCause cause) {
-        if (file != null) {
-            try {
-                file.close();
-            } catch (IOException e) {
-                throw new UncheckedIOException(e);
-            }
-        }
-    }
-
     private LookupFile createLookupFile(DataFileMeta file) throws IOException {
         File localFile = localFileFactory.get();
         if (!localFile.createNewFile()) {
@@ -218,55 +183,19 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
             context = kvWriter.close();
         }
 
+        ownCachedFiles.add(file.fileName());
         return new LookupFile(
-                localFile, file, lookupStoreFactory.createReader(localFile, 
context), cachedFiles);
+                localFile,
+                file,
+                lookupStoreFactory.createReader(localFile, context),
+                () -> ownCachedFiles.remove(file.fileName()));
     }
 
     @Override
     public void close() throws IOException {
-        Set<String> toClean = new HashSet<>(cachedFiles);
+        Set<String> toClean = new HashSet<>(ownCachedFiles);
         for (String cachedFile : toClean) {
-            lookupFiles.invalidate(cachedFile);
-        }
-    }
-
-    /** Lookup file. */
-    public static class LookupFile implements Closeable {
-
-        private final File localFile;
-        private final DataFileMeta remoteFile;
-        private final LookupStoreReader reader;
-        private final Set<String> cachedFiles;
-
-        private boolean isClosed = false;
-
-        public LookupFile(
-                File localFile,
-                DataFileMeta remoteFile,
-                LookupStoreReader reader,
-                Set<String> cachedFiles) {
-            this.localFile = localFile;
-            this.remoteFile = remoteFile;
-            this.reader = reader;
-            this.cachedFiles = cachedFiles;
-        }
-
-        @Nullable
-        public byte[] get(byte[] key) throws IOException {
-            checkArgument(!isClosed);
-            return reader.lookup(key);
-        }
-
-        public DataFileMeta remoteFile() {
-            return remoteFile;
-        }
-
-        @Override
-        public void close() throws IOException {
-            reader.close();
-            isClosed = true;
-            cachedFiles.remove(remoteFile.fileName());
-            FileIOUtils.deleteFileOrDirectory(localFile);
+            lookupFileCache.invalidate(cachedFile);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 6a5b30aea..85a796d2e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -43,6 +43,7 @@ import org.apache.paimon.io.RecordLevelExpire;
 import org.apache.paimon.lookup.LookupStoreFactory;
 import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.mergetree.Levels;
+import org.apache.paimon.mergetree.LookupFile;
 import org.apache.paimon.mergetree.LookupLevels;
 import org.apache.paimon.mergetree.LookupLevels.ContainsValueProcessor;
 import org.apache.paimon.mergetree.LookupLevels.KeyValueProcessor;
@@ -104,7 +105,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
     private final RowType keyType;
     private final RowType valueType;
     @Nullable private final RecordLevelExpire recordLevelExpire;
-    private Cache<String, LookupLevels.LookupFile> lookupFileCache;
+    @Nullable private Cache<String, LookupFile> lookupFileCache;
 
     public KeyValueFileStoreWrite(
             FileIO fileIO,
@@ -361,7 +362,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
         Options options = this.options.toConfiguration();
         if (lookupFileCache == null) {
             lookupFileCache =
-                    LookupLevels.createCache(
+                    LookupFile.createCache(
                             
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
                             
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 7475d6e8e..f0a32d101 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -34,6 +34,7 @@ import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.lookup.LookupStoreFactory;
 import org.apache.paimon.mergetree.Levels;
+import org.apache.paimon.mergetree.LookupFile;
 import org.apache.paimon.mergetree.LookupLevels;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
@@ -72,7 +73,7 @@ public class LocalTableQuery implements TableQuery {
 
     private IOManager ioManager;
 
-    private Cache<String, LookupLevels.LookupFile> lookupFileCache;
+    @Nullable private Cache<String, LookupFile> lookupFileCache;
 
     public LocalTableQuery(FileStoreTable table) {
         this.options = table.coreOptions();
@@ -136,7 +137,7 @@ public class LocalTableQuery implements TableQuery {
         Options options = this.options.toConfiguration();
         if (lookupFileCache == null) {
             lookupFileCache =
-                    LookupLevels.createCache(
+                    LookupFile.createCache(
                             
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
                             
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 07972b28a..086548547 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -196,7 +196,7 @@ public class ContainsLevelsTest {
                 new HashLookupStoreFactory(
                         new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 
0.75, "none"),
                 rowCount -> BloomFilter.builder(rowCount, 0.01),
-                LookupLevels.createCache(Duration.ofHours(1), maxDiskSize));
+                LookupFile.createCache(Duration.ofHours(1), maxDiskSize));
     }
 
     private KeyValue kv(int key, int value) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index a5d58cf13..9945b54b9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -275,7 +275,7 @@ public class LookupLevelsTest {
                 new HashLookupStoreFactory(
                         new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 
0.75, "none"),
                 rowCount -> BloomFilter.builder(rowCount, 0.05),
-                LookupLevels.createCache(Duration.ofHours(1), maxDiskSize));
+                LookupFile.createCache(Duration.ofHours(1), maxDiskSize));
     }
 
     private KeyValue kv(int key, int value) {

Reply via email to