This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 29bc06cac [core] Manage the lookup file disk cache in task granularity
(#3853)
29bc06cac is described below
commit 29bc06cac05c937b4c76afeb53570298139edd65
Author: Aitozi <[email protected]>
AuthorDate: Wed Jul 31 11:51:15 2024 +0800
[core] Manage the lookup file disk cache in task granularity (#3853)
---
.../org/apache/paimon/mergetree/LookupLevels.java | 60 ++++++++++++++++------
.../paimon/operation/KeyValueFileStoreWrite.java | 23 +++++++--
.../apache/paimon/table/query/LocalTableQuery.java | 19 +++++--
.../paimon/mergetree/ContainsLevelsTest.java | 5 +-
.../apache/paimon/mergetree/LookupLevelsTest.java | 8 +--
5 files changed, 86 insertions(+), 29 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
index d6024ebb8..1ba31816d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
@@ -39,6 +39,7 @@ import org.apache.paimon.utils.IOFunction;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Weigher;
import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
import javax.annotation.Nullable;
@@ -50,6 +51,8 @@ import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -72,6 +75,7 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
private final LookupStoreFactory lookupStoreFactory;
private final Cache<String, LookupFile> lookupFiles;
private final Function<Long, BloomFilter.Builder> bfGenerator;
+ private final Set<String> cachedFiles;
public LookupLevels(
Levels levels,
@@ -81,9 +85,8 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
Supplier<File> localFileFactory,
LookupStoreFactory lookupStoreFactory,
- Duration fileRetention,
- MemorySize maxDiskSize,
- Function<Long, BloomFilter.Builder> bfGenerator) {
+ Function<Long, BloomFilter.Builder> bfGenerator,
+ Cache<String, LookupFile> lookupFiles) {
this.levels = levels;
this.keyComparator = keyComparator;
this.keySerializer = new RowCompactedSerializer(keyType);
@@ -91,18 +94,23 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
this.fileReaderFactory = fileReaderFactory;
this.localFileFactory = localFileFactory;
this.lookupStoreFactory = lookupStoreFactory;
- this.lookupFiles =
- Caffeine.newBuilder()
- .expireAfterAccess(fileRetention)
- .maximumWeight(maxDiskSize.getKibiBytes())
- .weigher(this::fileWeigh)
- .removalListener(this::removalCallback)
- .executor(MoreExecutors.directExecutor())
- .build();
this.bfGenerator = bfGenerator;
+ this.lookupFiles = lookupFiles;
+ this.cachedFiles = new HashSet<>();
levels.addDropFileCallback(this);
}
+ public static Cache<String, LookupFile> createCache(
+ Duration fileRetention, MemorySize maxDiskSize) {
+ return Caffeine.newBuilder()
+ .expireAfterAccess(fileRetention)
+ .maximumWeight(maxDiskSize.getKibiBytes())
+ .weigher((Weigher<String, LookupFile>) LookupLevels::fileWeigh)
+ .removalListener(LookupLevels::removalCallback)
+ .executor(MoreExecutors.directExecutor())
+ .build();
+ }
+
public Levels getLevels() {
return levels;
}
@@ -112,6 +120,11 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
return lookupFiles;
}
+ @VisibleForTesting
+ Set<String> cachedFiles() {
+ return cachedFiles;
+ }
+
@Override
public void notifyDropFile(String file) {
lookupFiles.invalidate(file);
@@ -138,6 +151,7 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
while (lookupFile == null || lookupFile.isClosed) {
lookupFile = createLookupFile(file);
+ cachedFiles.add(file.fileName());
lookupFiles.put(file.fileName(), lookupFile);
}
@@ -151,11 +165,11 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
key, lookupFile.remoteFile().level(), valueBytes,
file.fileName());
}
- private int fileWeigh(String file, LookupFile lookupFile) {
+ private static int fileWeigh(String file, LookupFile lookupFile) {
return fileKibiBytes(lookupFile.localFile);
}
- private void removalCallback(String key, LookupFile file, RemovalCause
cause) {
+ private static void removalCallback(String key, LookupFile file,
RemovalCause cause) {
if (file != null) {
try {
file.close();
@@ -204,26 +218,37 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
context = kvWriter.close();
}
- return new LookupFile(localFile, file,
lookupStoreFactory.createReader(localFile, context));
+ return new LookupFile(
+ localFile, file, lookupStoreFactory.createReader(localFile,
context), cachedFiles);
}
@Override
public void close() throws IOException {
- lookupFiles.invalidateAll();
+ Set<String> toClean = new HashSet<>(cachedFiles);
+ for (String cachedFile : toClean) {
+ lookupFiles.invalidate(cachedFile);
+ }
}
- private static class LookupFile implements Closeable {
+ /** Lookup file. */
+ public static class LookupFile implements Closeable {
private final File localFile;
private final DataFileMeta remoteFile;
private final LookupStoreReader reader;
+ private final Set<String> cachedFiles;
private boolean isClosed = false;
- public LookupFile(File localFile, DataFileMeta remoteFile,
LookupStoreReader reader) {
+ public LookupFile(
+ File localFile,
+ DataFileMeta remoteFile,
+ LookupStoreReader reader,
+ Set<String> cachedFiles) {
this.localFile = localFile;
this.remoteFile = remoteFile;
this.reader = reader;
+ this.cachedFiles = cachedFiles;
}
@Nullable
@@ -240,6 +265,7 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
public void close() throws IOException {
reader.close();
isClosed = true;
+ cachedFiles.remove(remoteFile.fileName());
FileIOUtils.deleteFileOrDirectory(localFile);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index df3242141..6a5b30aea 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -71,6 +71,8 @@ import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.UserDefinedSeqComparator;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,6 +104,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
private final RowType keyType;
private final RowType valueType;
@Nullable private final RecordLevelExpire recordLevelExpire;
+ private Cache<String, LookupLevels.LookupFile> lookupFileCache;
public KeyValueFileStoreWrite(
FileIO fileIO,
@@ -356,6 +359,13 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
cacheManager,
new
RowCompactedSerializer(keyType).createSliceComparator());
Options options = this.options.toConfiguration();
+ if (lookupFileCache == null) {
+ lookupFileCache =
+ LookupLevels.createCache(
+
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
+
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
+ }
+
return new LookupLevels<>(
levels,
keyComparatorSupplier.get(),
@@ -364,8 +374,15 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
readerFactory::createRecordReader,
() -> ioManager.createChannel().getPathFile(),
lookupStoreFactory,
- options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
- options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE),
- bfGenerator(options));
+ bfGenerator(options),
+ lookupFileCache);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (lookupFileCache != null) {
+ lookupFileCache.invalidateAll();
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 9181fdb16..7475d6e8e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -41,6 +41,8 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.Preconditions;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
import javax.annotation.Nullable;
import java.io.IOException;
@@ -70,6 +72,8 @@ public class LocalTableQuery implements TableQuery {
private IOManager ioManager;
+ private Cache<String, LookupLevels.LookupFile> lookupFileCache;
+
public LocalTableQuery(FileStoreTable table) {
this.options = table.coreOptions();
this.tableView = new HashMap<>();
@@ -130,6 +134,13 @@ public class LocalTableQuery implements TableQuery {
KeyValueFileReaderFactory factory =
readerFactoryBuilder.build(partition, bucket,
DeletionVector.emptyFactory());
Options options = this.options.toConfiguration();
+ if (lookupFileCache == null) {
+ lookupFileCache =
+ LookupLevels.createCache(
+
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
+
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
+ }
+
LookupLevels<KeyValue> lookupLevels =
new LookupLevels<>(
levels,
@@ -148,9 +159,8 @@ public class LocalTableQuery implements TableQuery {
.createChannel()
.getPathFile(),
lookupStoreFactory,
- options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
- options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE),
- bfGenerator(options));
+ bfGenerator(options),
+ lookupFileCache);
tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket,
lookupLevels);
}
@@ -202,6 +212,9 @@ public class LocalTableQuery implements TableQuery {
bucket.getValue().close();
}
}
+ if (lookupFileCache != null) {
+ lookupFileCache.invalidateAll();
+ }
tableView.clear();
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 781bab0df..07972b28a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -195,9 +195,8 @@ public class ContainsLevelsTest {
() -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(1)), 2048,
0.75, "none"),
- Duration.ofHours(1),
- maxDiskSize,
- rowCount -> BloomFilter.builder(rowCount, 0.01));
+ rowCount -> BloomFilter.builder(rowCount, 0.01),
+ LookupLevels.createCache(Duration.ofHours(1), maxDiskSize));
}
private KeyValue kv(int key, int value) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 7b89f409d..a5d58cf13 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -193,6 +193,8 @@ public class LookupLevelsTest {
assertThat(kv.level()).isEqualTo(1);
assertThat(kv.value().getInt(1)).isEqualTo(i);
}
+ assertThat(lookupLevels.lookupFiles().asMap().keySet())
+ .isEqualTo(lookupLevels.cachedFiles());
// some files are invalided
long fileNumber = lookupLevels.lookupFiles().estimatedSize();
@@ -202,6 +204,7 @@ public class LookupLevelsTest {
assertThat(fileNumber).isNotEqualTo(fileNum).isEqualTo(lookupFiles.length);
lookupLevels.close();
+ assertThat(lookupLevels.cachedFiles()).isEmpty();
assertThat(lookupLevels.lookupFiles().estimatedSize()).isEqualTo(0);
}
@@ -271,9 +274,8 @@ public class LookupLevelsTest {
() -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(1)), 2048,
0.75, "none"),
- Duration.ofHours(1),
- maxDiskSize,
- rowCount -> BloomFilter.builder(rowCount, 0.05));
+ rowCount -> BloomFilter.builder(rowCount, 0.05),
+ LookupLevels.createCache(Duration.ofHours(1), maxDiskSize));
}
private KeyValue kv(int key, int value) {