This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 36d5634ec0 [core] Download remote lookup files in LocalTableQuery
reads (#8123) (#8125)
36d5634ec0 is described below
commit 36d5634ec018d331a94e9ca949cede5b6375b709
Author: Jordan Epstein <[email protected]>
AuthorDate: Sat Jun 6 03:45:02 2026 -0500
[core] Download remote lookup files in LocalTableQuery reads (#8123) (#8125)
---
.../apache/paimon/table/query/LocalTableQuery.java | 26 +++++
.../paimon/table/PrimaryKeySimpleTableTest.java | 108 +++++++++++++++++++++
2 files changed, 134 insertions(+)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 7672ee4341..f2e83d073f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -29,6 +29,7 @@ import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.cache.CacheManager;
@@ -38,6 +39,7 @@ import org.apache.paimon.mergetree.LookupFile;
import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.mergetree.lookup.LookupSerializerFactory;
import org.apache.paimon.mergetree.lookup.PersistValueProcessor;
+import org.apache.paimon.mergetree.lookup.RemoteLookupFileManager;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
@@ -81,6 +83,7 @@ public class LocalTableQuery implements TableQuery {
private final RowType rowType;
private final RowType partitionType;
+ private final FileIO fileIO;
@Nullable private Filter<InternalRow> cacheRowFilter;
@@ -97,6 +100,7 @@ public class LocalTableQuery implements TableQuery {
this.readerFactoryBuilder = store.newReaderFactoryBuilder();
this.rowType = table.schema().logicalRowType();
this.partitionType = table.schema().logicalPartitionType();
+ this.fileIO = table.fileIO();
RowType keyType = readerFactoryBuilder.keyType();
this.keyComparatorSupplier = new
KeyComparatorSupplier(readerFactoryBuilder.keyType());
this.lookupStoreFactory =
@@ -166,6 +170,28 @@ public class LocalTableQuery implements TableQuery {
bfGenerator(options),
lookupFileCache);
+ // Optimization - download lookup files if already persisted to object
store
+ // We download these files if three conditions are met
+ // 1) lookup.remote-file.enabled is true - files are persisted in the
first place
+ // 2) deletion-vectors.enabled is false - SSTables only contain row
positions, not values,
+ // when DVs are enabled
+ // 3) The client is accessing the full data row, as opposed to a
projection
+ // - The persisted remote SSTable files are created during
compaction and hold the entire
+ // data row value
+ // - We could deserialize and project in memory, but we'll have to
read much more data,
+ // not as clear of a win
+ boolean fullValueRead =
readerFactoryBuilder.readValueType().equals(rowType);
+ if (this.options.lookupRemoteFileEnabled()
+ && !this.options.deletionVectorsEnabled()
+ && fullValueRead) {
+ // Calling the constructor tells `lookupLevels` to load remote
files
+ new RemoteLookupFileManager<>(
+ fileIO,
+ factory.pathFactory(),
+ lookupLevels,
+ this.options.lookupRemoteLevelThreshold());
+ }
+
tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket,
lookupLevels);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 4628710b9e..245eb6ccf0 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.io.DataFileMeta;
@@ -2320,6 +2321,113 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
innerTestTableQuery(table);
}
+ @Test
+ public void testTableQueryDownloadsRemoteLookupFile() throws Exception {
+ // Writer persists remote lookup ssts (deletion-vectors off -> "value"
processor).
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+ options.set(CHANGELOG_PRODUCER, LOOKUP);
+
options.set(CoreOptions.LOOKUP_REMOTE_FILE_ENABLED, true);
+ });
+ IOManager ioManager = IOManager.create(tablePath.toString());
+ StreamTableWrite write =
table.newWrite(commitUser).withIOManager(ioManager);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ // overwrite key 10 then fully compact so all data lands in a single
high-level file that
+ // carries a remote lookup sst
+ write.write(rowData(1, 10, 110L));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(1, write.prepareCommit(true, 1));
+ write.close();
+ commit.close();
+
+ List<DataSplit> dataSplits =
table.newSnapshotReader().read().dataSplits();
+ List<DataFileMeta> dataFiles = new ArrayList<>();
+ for (DataSplit split : dataSplits) {
+ dataFiles.addAll(split.dataFiles());
+ }
+ // every remaining data file must carry a remote lookup sst, otherwise
deleting the data
+ // files below would make a lookup unanswerable regardless of the
downloader
+ assertThat(dataFiles).isNotEmpty();
+ assertThat(dataFiles)
+ .allMatch(
+ f -> f.extraFiles().stream().anyMatch(e ->
e.endsWith(".value.v1.lookup")));
+
+ // back up then delete the underlying data files: now a lookup can
only be answered by
+ // downloading the remote sst the writer persisted; a local rebuild
(scan of the data file)
+ // would fail. The backups let us restore the data files later to
exercise the rebuild path.
+ LocalFileIO fileIO = LocalFileIO.create();
+ Map<Path, Path> dataFileBackups = new HashMap<>();
+ for (DataSplit split : dataSplits) {
+ for (DataFileMeta f : split.dataFiles()) {
+ Path dataFilePath =
+ table.store()
+ .pathFactory()
+ .createDataFilePathFactory(split.partition(),
split.bucket())
+ .toPath(f);
+ Path backupPath = new Path(dataFilePath.getParent(),
f.fileName() + ".bak");
+ fileIO.copyFile(dataFilePath, backupPath, false);
+ dataFileBackups.put(dataFilePath, backupPath);
+ fileIO.deleteQuietly(dataFilePath);
+ }
+ }
+
+ // full value (no projection) -> downloader is wired -> lookup
succeeds via download
+ LocalTableQuery query =
table.newLocalTableQuery().withIOManager(ioManager);
+ for (DataSplit split : dataSplits) {
+ query.refreshFiles(
+ split.partition(), split.bucket(),
Collections.emptyList(), split.dataFiles());
+ }
+ InternalRow value = query.lookup(row(1), 0, row(10));
+ assertThat(value).isNotNull();
+ assertThat(BATCH_ROW_TO_STRING.apply(value))
+ .isEqualTo("1|10|110|binary|varbinary|mapKey:mapVal|multiset");
+ value = query.lookup(row(1), 0, row(20));
+ assertThat(value).isNotNull();
+ assertThat(BATCH_ROW_TO_STRING.apply(value))
+ .isEqualTo("1|20|200|binary|varbinary|mapKey:mapVal|multiset");
+ query.close();
+
+ // value projection -> remote sst (full value) is unsafe to reuse, so
the downloader is
+ // NOT wired and the read path falls back to rebuilding from the (now
deleted) data file
+ LocalTableQuery projected =
+ table.newLocalTableQuery()
+ .withValueProjection(new int[] {2, 1, 0})
+ .withIOManager(ioManager);
+ for (DataSplit split : dataSplits) {
+ projected.refreshFiles(
+ split.partition(), split.bucket(),
Collections.emptyList(), split.dataFiles());
+ }
+ assertThatThrownBy(() -> projected.lookup(row(1), 0, row(10)))
+ .isInstanceOf(Exception.class);
+ projected.close();
+
+ // restore the data files and confirm the same projected query
(downloader still not wired)
+ // now answers the lookup by rebuilding the sst locally from the data
file
+ for (Map.Entry<Path, Path> backup : dataFileBackups.entrySet()) {
+ fileIO.copyFile(backup.getValue(), backup.getKey(), false);
+ }
+ LocalTableQuery rebuilt =
+ table.newLocalTableQuery()
+ .withValueProjection(new int[] {2, 1, 0})
+ .withIOManager(ioManager);
+ for (DataSplit split : dataSplits) {
+ rebuilt.refreshFiles(
+ split.partition(), split.bucket(),
Collections.emptyList(), split.dataFiles());
+ }
+ InternalRow projectedValue = rebuilt.lookup(row(1), 0, row(10));
+ assertThat(projectedValue).isNotNull();
+ Function<InternalRow, String> projectToString =
+ r -> r.getLong(0) + "|" + r.getInt(1) + "|" + r.getInt(2);
+
assertThat(projectToString.apply(projectedValue)).isEqualTo("110|10|1");
+ rebuilt.close();
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testLookupWithDropDelete(boolean specificConfig) throws
Exception {