This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 36d5634ec0 [core] Download remote lookup files in LocalTableQuery 
reads (#8123) (#8125)
36d5634ec0 is described below

commit 36d5634ec018d331a94e9ca949cede5b6375b709
Author: Jordan Epstein <[email protected]>
AuthorDate: Sat Jun 6 03:45:02 2026 -0500

    [core] Download remote lookup files in LocalTableQuery reads (#8123) (#8125)
---
 .../apache/paimon/table/query/LocalTableQuery.java |  26 +++++
 .../paimon/table/PrimaryKeySimpleTableTest.java    | 108 +++++++++++++++++++++
 2 files changed, 134 insertions(+)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 7672ee4341..f2e83d073f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -29,6 +29,7 @@ import org.apache.paimon.data.serializer.InternalSerializers;
 import org.apache.paimon.data.serializer.RowCompactedSerializer;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.cache.CacheManager;
@@ -38,6 +39,7 @@ import org.apache.paimon.mergetree.LookupFile;
 import org.apache.paimon.mergetree.LookupLevels;
 import org.apache.paimon.mergetree.lookup.LookupSerializerFactory;
 import org.apache.paimon.mergetree.lookup.PersistValueProcessor;
+import org.apache.paimon.mergetree.lookup.RemoteLookupFileManager;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.FileStoreTable;
@@ -81,6 +83,7 @@ public class LocalTableQuery implements TableQuery {
 
     private final RowType rowType;
     private final RowType partitionType;
+    private final FileIO fileIO;
 
     @Nullable private Filter<InternalRow> cacheRowFilter;
 
@@ -97,6 +100,7 @@ public class LocalTableQuery implements TableQuery {
         this.readerFactoryBuilder = store.newReaderFactoryBuilder();
         this.rowType = table.schema().logicalRowType();
         this.partitionType = table.schema().logicalPartitionType();
+        this.fileIO = table.fileIO();
         RowType keyType = readerFactoryBuilder.keyType();
         this.keyComparatorSupplier = new 
KeyComparatorSupplier(readerFactoryBuilder.keyType());
         this.lookupStoreFactory =
@@ -166,6 +170,28 @@ public class LocalTableQuery implements TableQuery {
                         bfGenerator(options),
                         lookupFileCache);
 
+        // Optimization - download lookup files if already persisted to object 
store
+        // We download these files if three conditions are met
+        // 1) lookup.remote-file.enabled is true - files are persisted in the 
first place
+        // 2) deletion-vectors.enabled is false - SSTables only contain row 
positions, not values,
+        // when DVs are enabled
+        // 3) The client is accessing the full data row, as opposed to a 
projection
+        //    - The persisted remote SSTable files are created during 
compaction and hold the entire
+        // data row value
+        //    - We could deserialize and project in memory, but we'll have to 
read much more data,
+        // not as clear of a win
+        boolean fullValueRead = 
readerFactoryBuilder.readValueType().equals(rowType);
+        if (this.options.lookupRemoteFileEnabled()
+                && !this.options.deletionVectorsEnabled()
+                && fullValueRead) {
+            // Calling the constructor tells `lookupLevels` to load remote 
files
+            new RemoteLookupFileManager<>(
+                    fileIO,
+                    factory.pathFactory(),
+                    lookupLevels,
+                    this.options.lookupRemoteLevelThreshold());
+        }
+
         tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket, 
lookupLevels);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 4628710b9e..245eb6ccf0 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.BundleRecords;
 import org.apache.paimon.io.DataFileMeta;
@@ -2320,6 +2321,113 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
         innerTestTableQuery(table);
     }
 
+    @Test
+    public void testTableQueryDownloadsRemoteLookupFile() throws Exception {
+        // Writer persists remote lookup ssts (deletion-vectors off -> "value" 
processor).
+        FileStoreTable table =
+                createFileStoreTable(
+                        options -> {
+                            options.set(CHANGELOG_PRODUCER, LOOKUP);
+                            
options.set(CoreOptions.LOOKUP_REMOTE_FILE_ENABLED, true);
+                        });
+        IOManager ioManager = IOManager.create(tablePath.toString());
+        StreamTableWrite write = 
table.newWrite(commitUser).withIOManager(ioManager);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        // overwrite key 10 then fully compact so all data lands in a single 
high-level file that
+        // carries a remote lookup sst
+        write.write(rowData(1, 10, 110L));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(1, write.prepareCommit(true, 1));
+        write.close();
+        commit.close();
+
+        List<DataSplit> dataSplits = 
table.newSnapshotReader().read().dataSplits();
+        List<DataFileMeta> dataFiles = new ArrayList<>();
+        for (DataSplit split : dataSplits) {
+            dataFiles.addAll(split.dataFiles());
+        }
+        // every remaining data file must carry a remote lookup sst, otherwise 
deleting the data
+        // files below would make a lookup unanswerable regardless of the 
downloader
+        assertThat(dataFiles).isNotEmpty();
+        assertThat(dataFiles)
+                .allMatch(
+                        f -> f.extraFiles().stream().anyMatch(e -> 
e.endsWith(".value.v1.lookup")));
+
+        // back up then delete the underlying data files: now a lookup can 
only be answered by
+        // downloading the remote sst the writer persisted; a local rebuild 
(scan of the data file)
+        // would fail. The backups let us restore the data files later to 
exercise the rebuild path.
+        LocalFileIO fileIO = LocalFileIO.create();
+        Map<Path, Path> dataFileBackups = new HashMap<>();
+        for (DataSplit split : dataSplits) {
+            for (DataFileMeta f : split.dataFiles()) {
+                Path dataFilePath =
+                        table.store()
+                                .pathFactory()
+                                .createDataFilePathFactory(split.partition(), 
split.bucket())
+                                .toPath(f);
+                Path backupPath = new Path(dataFilePath.getParent(), 
f.fileName() + ".bak");
+                fileIO.copyFile(dataFilePath, backupPath, false);
+                dataFileBackups.put(dataFilePath, backupPath);
+                fileIO.deleteQuietly(dataFilePath);
+            }
+        }
+
+        // full value (no projection) -> downloader is wired -> lookup 
succeeds via download
+        LocalTableQuery query = 
table.newLocalTableQuery().withIOManager(ioManager);
+        for (DataSplit split : dataSplits) {
+            query.refreshFiles(
+                    split.partition(), split.bucket(), 
Collections.emptyList(), split.dataFiles());
+        }
+        InternalRow value = query.lookup(row(1), 0, row(10));
+        assertThat(value).isNotNull();
+        assertThat(BATCH_ROW_TO_STRING.apply(value))
+                .isEqualTo("1|10|110|binary|varbinary|mapKey:mapVal|multiset");
+        value = query.lookup(row(1), 0, row(20));
+        assertThat(value).isNotNull();
+        assertThat(BATCH_ROW_TO_STRING.apply(value))
+                .isEqualTo("1|20|200|binary|varbinary|mapKey:mapVal|multiset");
+        query.close();
+
+        // value projection -> remote sst (full value) is unsafe to reuse, so 
the downloader is
+        // NOT wired and the read path falls back to rebuilding from the (now 
deleted) data file
+        LocalTableQuery projected =
+                table.newLocalTableQuery()
+                        .withValueProjection(new int[] {2, 1, 0})
+                        .withIOManager(ioManager);
+        for (DataSplit split : dataSplits) {
+            projected.refreshFiles(
+                    split.partition(), split.bucket(), 
Collections.emptyList(), split.dataFiles());
+        }
+        assertThatThrownBy(() -> projected.lookup(row(1), 0, row(10)))
+                .isInstanceOf(Exception.class);
+        projected.close();
+
+        // restore the data files and confirm the same projected query 
(downloader still not wired)
+        // now answers the lookup by rebuilding the sst locally from the data 
file
+        for (Map.Entry<Path, Path> backup : dataFileBackups.entrySet()) {
+            fileIO.copyFile(backup.getValue(), backup.getKey(), false);
+        }
+        LocalTableQuery rebuilt =
+                table.newLocalTableQuery()
+                        .withValueProjection(new int[] {2, 1, 0})
+                        .withIOManager(ioManager);
+        for (DataSplit split : dataSplits) {
+            rebuilt.refreshFiles(
+                    split.partition(), split.bucket(), 
Collections.emptyList(), split.dataFiles());
+        }
+        InternalRow projectedValue = rebuilt.lookup(row(1), 0, row(10));
+        assertThat(projectedValue).isNotNull();
+        Function<InternalRow, String> projectToString =
+                r -> r.getLong(0) + "|" + r.getInt(1) + "|" + r.getInt(2);
+        
assertThat(projectToString.apply(projectedValue)).isEqualTo("110|10|1");
+        rebuilt.close();
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testLookupWithDropDelete(boolean specificConfig) throws 
Exception {

Reply via email to