This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6d66fc112 [core] Extract LookupFile from LookupLevels
6d66fc112 is described below
commit 6d66fc112851e332daadca2e9db296e747f51f41
Author: Jingsong <[email protected]>
AuthorDate: Wed Jul 31 12:07:46 2024 +0800
[core] Extract LookupFile from LookupLevels
---
.../org/apache/paimon/mergetree/LookupFile.java | 108 ++++++++++++++++++++
.../org/apache/paimon/mergetree/LookupLevels.java | 109 ++++-----------------
.../paimon/operation/KeyValueFileStoreWrite.java | 5 +-
.../apache/paimon/table/query/LocalTableQuery.java | 5 +-
.../paimon/mergetree/ContainsLevelsTest.java | 2 +-
.../apache/paimon/mergetree/LookupLevelsTest.java | 2 +-
6 files changed, 135 insertions(+), 96 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
new file mode 100644
index 000000000..429ede810
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree;
+
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.lookup.LookupStoreReader;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.utils.FileIOUtils;
+
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
+import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+
+import static org.apache.paimon.mergetree.LookupUtils.fileKibiBytes;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Lookup file for cache remote file to local. */
+public class LookupFile implements Closeable {
+
+ private final File localFile;
+ private final DataFileMeta remoteFile;
+ private final LookupStoreReader reader;
+ private final Runnable callback;
+
+ private boolean isClosed = false;
+
+ public LookupFile(
+ File localFile, DataFileMeta remoteFile, LookupStoreReader reader,
Runnable callback) {
+ this.localFile = localFile;
+ this.remoteFile = remoteFile;
+ this.reader = reader;
+ this.callback = callback;
+ }
+
+ @Nullable
+ public byte[] get(byte[] key) throws IOException {
+ checkArgument(!isClosed);
+ return reader.lookup(key);
+ }
+
+ public DataFileMeta remoteFile() {
+ return remoteFile;
+ }
+
+ public boolean isClosed() {
+ return isClosed;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ isClosed = true;
+ callback.run();
+ FileIOUtils.deleteFileOrDirectory(localFile);
+ }
+
+ // ==================== Cache for Local File ======================
+
+ public static Cache<String, LookupFile> createCache(
+ Duration fileRetention, MemorySize maxDiskSize) {
+ return Caffeine.newBuilder()
+ .expireAfterAccess(fileRetention)
+ .maximumWeight(maxDiskSize.getKibiBytes())
+ .weigher(LookupFile::fileWeigh)
+ .removalListener(LookupFile::removalCallback)
+ .executor(MoreExecutors.directExecutor())
+ .build();
+ }
+
+ private static int fileWeigh(String file, LookupFile lookupFile) {
+ return fileKibiBytes(lookupFile.localFile);
+ }
+
+ private static void removalCallback(String file, LookupFile lookupFile,
RemovalCause cause) {
+ if (lookupFile != null) {
+ try {
+ lookupFile.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
index 1ba31816d..6cb3bc61b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
@@ -24,10 +24,8 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.lookup.LookupStoreFactory;
-import org.apache.paimon.lookup.LookupStoreReader;
import org.apache.paimon.lookup.LookupStoreWriter;
import org.apache.paimon.memory.MemorySegment;
-import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.RowKind;
@@ -37,18 +35,12 @@ import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.IOFunction;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
-import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
-import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
-import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Weigher;
-import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.time.Duration;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
@@ -57,8 +49,6 @@ import java.util.TreeSet;
import java.util.function.Function;
import java.util.function.Supplier;
-import static org.apache.paimon.mergetree.LookupUtils.fileKibiBytes;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.VarLengthIntUtils.MAX_VAR_LONG_SIZE;
import static org.apache.paimon.utils.VarLengthIntUtils.decodeLong;
import static org.apache.paimon.utils.VarLengthIntUtils.encodeLong;
@@ -73,9 +63,10 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
private final IOFunction<DataFileMeta, RecordReader<KeyValue>>
fileReaderFactory;
private final Supplier<File> localFileFactory;
private final LookupStoreFactory lookupStoreFactory;
- private final Cache<String, LookupFile> lookupFiles;
private final Function<Long, BloomFilter.Builder> bfGenerator;
- private final Set<String> cachedFiles;
+
+ private final Cache<String, LookupFile> lookupFileCache;
+ private final Set<String> ownCachedFiles;
public LookupLevels(
Levels levels,
@@ -86,7 +77,7 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
Supplier<File> localFileFactory,
LookupStoreFactory lookupStoreFactory,
Function<Long, BloomFilter.Builder> bfGenerator,
- Cache<String, LookupFile> lookupFiles) {
+ Cache<String, LookupFile> lookupFileCache) {
this.levels = levels;
this.keyComparator = keyComparator;
this.keySerializer = new RowCompactedSerializer(keyType);
@@ -95,39 +86,28 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
this.localFileFactory = localFileFactory;
this.lookupStoreFactory = lookupStoreFactory;
this.bfGenerator = bfGenerator;
- this.lookupFiles = lookupFiles;
- this.cachedFiles = new HashSet<>();
+ this.lookupFileCache = lookupFileCache;
+ this.ownCachedFiles = new HashSet<>();
levels.addDropFileCallback(this);
}
- public static Cache<String, LookupFile> createCache(
- Duration fileRetention, MemorySize maxDiskSize) {
- return Caffeine.newBuilder()
- .expireAfterAccess(fileRetention)
- .maximumWeight(maxDiskSize.getKibiBytes())
- .weigher((Weigher<String, LookupFile>) LookupLevels::fileWeigh)
- .removalListener(LookupLevels::removalCallback)
- .executor(MoreExecutors.directExecutor())
- .build();
- }
-
public Levels getLevels() {
return levels;
}
@VisibleForTesting
Cache<String, LookupFile> lookupFiles() {
- return lookupFiles;
+ return lookupFileCache;
}
@VisibleForTesting
Set<String> cachedFiles() {
- return cachedFiles;
+ return ownCachedFiles;
}
@Override
public void notifyDropFile(String file) {
- lookupFiles.invalidate(file);
+ lookupFileCache.invalidate(file);
}
@Nullable
@@ -147,12 +127,11 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
@Nullable
private T lookup(InternalRow key, DataFileMeta file) throws IOException {
- LookupFile lookupFile = lookupFiles.getIfPresent(file.fileName());
+ LookupFile lookupFile = lookupFileCache.getIfPresent(file.fileName());
- while (lookupFile == null || lookupFile.isClosed) {
+ while (lookupFile == null || lookupFile.isClosed()) {
lookupFile = createLookupFile(file);
- cachedFiles.add(file.fileName());
- lookupFiles.put(file.fileName(), lookupFile);
+ lookupFileCache.put(file.fileName(), lookupFile);
}
byte[] keyBytes = keySerializer.serializeToBytes(key);
@@ -165,20 +144,6 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
key, lookupFile.remoteFile().level(), valueBytes,
file.fileName());
}
- private static int fileWeigh(String file, LookupFile lookupFile) {
- return fileKibiBytes(lookupFile.localFile);
- }
-
- private static void removalCallback(String key, LookupFile file,
RemovalCause cause) {
- if (file != null) {
- try {
- file.close();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
- }
-
private LookupFile createLookupFile(DataFileMeta file) throws IOException {
File localFile = localFileFactory.get();
if (!localFile.createNewFile()) {
@@ -218,55 +183,19 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
context = kvWriter.close();
}
+ ownCachedFiles.add(file.fileName());
return new LookupFile(
- localFile, file, lookupStoreFactory.createReader(localFile,
context), cachedFiles);
+ localFile,
+ file,
+ lookupStoreFactory.createReader(localFile, context),
+ () -> ownCachedFiles.remove(file.fileName()));
}
@Override
public void close() throws IOException {
- Set<String> toClean = new HashSet<>(cachedFiles);
+ Set<String> toClean = new HashSet<>(ownCachedFiles);
for (String cachedFile : toClean) {
- lookupFiles.invalidate(cachedFile);
- }
- }
-
- /** Lookup file. */
- public static class LookupFile implements Closeable {
-
- private final File localFile;
- private final DataFileMeta remoteFile;
- private final LookupStoreReader reader;
- private final Set<String> cachedFiles;
-
- private boolean isClosed = false;
-
- public LookupFile(
- File localFile,
- DataFileMeta remoteFile,
- LookupStoreReader reader,
- Set<String> cachedFiles) {
- this.localFile = localFile;
- this.remoteFile = remoteFile;
- this.reader = reader;
- this.cachedFiles = cachedFiles;
- }
-
- @Nullable
- public byte[] get(byte[] key) throws IOException {
- checkArgument(!isClosed);
- return reader.lookup(key);
- }
-
- public DataFileMeta remoteFile() {
- return remoteFile;
- }
-
- @Override
- public void close() throws IOException {
- reader.close();
- isClosed = true;
- cachedFiles.remove(remoteFile.fileName());
- FileIOUtils.deleteFileOrDirectory(localFile);
+ lookupFileCache.invalidate(cachedFile);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 6a5b30aea..85a796d2e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -43,6 +43,7 @@ import org.apache.paimon.io.RecordLevelExpire;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.Levels;
+import org.apache.paimon.mergetree.LookupFile;
import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.mergetree.LookupLevels.ContainsValueProcessor;
import org.apache.paimon.mergetree.LookupLevels.KeyValueProcessor;
@@ -104,7 +105,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
private final RowType keyType;
private final RowType valueType;
@Nullable private final RecordLevelExpire recordLevelExpire;
- private Cache<String, LookupLevels.LookupFile> lookupFileCache;
+ @Nullable private Cache<String, LookupFile> lookupFileCache;
public KeyValueFileStoreWrite(
FileIO fileIO,
@@ -361,7 +362,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
Options options = this.options.toConfiguration();
if (lookupFileCache == null) {
lookupFileCache =
- LookupLevels.createCache(
+ LookupFile.createCache(
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 7475d6e8e..f0a32d101 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -34,6 +34,7 @@ import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.mergetree.Levels;
+import org.apache.paimon.mergetree.LookupFile;
import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
@@ -72,7 +73,7 @@ public class LocalTableQuery implements TableQuery {
private IOManager ioManager;
- private Cache<String, LookupLevels.LookupFile> lookupFileCache;
+ @Nullable private Cache<String, LookupFile> lookupFileCache;
public LocalTableQuery(FileStoreTable table) {
this.options = table.coreOptions();
@@ -136,7 +137,7 @@ public class LocalTableQuery implements TableQuery {
Options options = this.options.toConfiguration();
if (lookupFileCache == null) {
lookupFileCache =
- LookupLevels.createCache(
+ LookupFile.createCache(
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 07972b28a..086548547 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -196,7 +196,7 @@ public class ContainsLevelsTest {
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(1)), 2048,
0.75, "none"),
rowCount -> BloomFilter.builder(rowCount, 0.01),
- LookupLevels.createCache(Duration.ofHours(1), maxDiskSize));
+ LookupFile.createCache(Duration.ofHours(1), maxDiskSize));
}
private KeyValue kv(int key, int value) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index a5d58cf13..9945b54b9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -275,7 +275,7 @@ public class LookupLevelsTest {
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(1)), 2048,
0.75, "none"),
rowCount -> BloomFilter.builder(rowCount, 0.05),
- LookupLevels.createCache(Duration.ofHours(1), maxDiskSize));
+ LookupFile.createCache(Duration.ofHours(1), maxDiskSize));
}
private KeyValue kv(int key, int value) {