This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e865e19fab [core] Support row id push down (#6483)
e865e19fab is described below
commit e865e19fabcdbc870fcb45f72e79fd261ea25bd2
Author: YeJunHao <[email protected]>
AuthorDate: Wed Oct 29 15:27:24 2025 +0800
[core] Support row id push down (#6483)
---
.../apache/paimon/reader/FileRecordIterator.java | 45 ++++
.../paimon/reader/FileRecordIteratorTest.java | 247 +++++++++++++++++++++
.../java/org/apache/paimon/io/DataFileMeta.java | 3 +
.../org/apache/paimon/io/DataFileRecordReader.java | 7 +
.../org/apache/paimon/io/PojoDataFileMeta.java | 21 ++
.../paimon/operation/AbstractFileStoreScan.java | 6 +
.../operation/DataEvolutionFileStoreScan.java | 33 ++-
.../paimon/operation/DataEvolutionSplitRead.java | 30 ++-
.../org/apache/paimon/operation/FileStoreScan.java | 2 +
.../apache/paimon/operation/RawFileSplitRead.java | 15 ++
.../org/apache/paimon/operation/SplitRead.java | 11 +
.../paimon/table/format/FormatReadBuilder.java | 5 +
.../paimon/table/source/AbstractDataTableRead.java | 9 +
.../paimon/table/source/AbstractDataTableScan.java | 6 +
.../paimon/table/source/AppendTableRead.java | 8 +
.../apache/paimon/table/source/InnerTableRead.java | 4 +
.../apache/paimon/table/source/InnerTableScan.java | 4 +
.../paimon/table/source/KeyValueTableRead.java | 5 +
.../apache/paimon/table/source/ReadBuilder.java | 8 +
.../paimon/table/source/ReadBuilderImpl.java | 16 +-
.../table/source/snapshot/SnapshotReader.java | 2 +
.../table/source/snapshot/SnapshotReaderImpl.java | 6 +
.../splitread/DataEvolutionSplitReadProvider.java | 5 +
.../apache/paimon/table/system/AuditLogTable.java | 6 +
.../org/apache/paimon/append/BlobTableTest.java | 53 +++++
.../paimon/table/DataEvolutionTableTest.java | 173 +++++++++++++++
.../DataEvolutionSplitReadProviderTest.java | 6 +
.../paimon/flink/lookup/LookupCompactDiffRead.java | 6 +
.../paimon/format/blob/BlobFormatReader.java | 9 +-
29 files changed, 739 insertions(+), 12 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
index 2d3c85f193..6911dffe7f 100644
---
a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
@@ -20,10 +20,12 @@ package org.apache.paimon.reader;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.RoaringBitmap32;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.Iterator;
import java.util.function.Function;
/**
@@ -107,4 +109,47 @@ public interface FileRecordIterator<T> extends
RecordReader.RecordIterator<T> {
}
};
}
+
+ default FileRecordIterator<T> selection(RoaringBitmap32 selection) {
+ FileRecordIterator<T> thisIterator = this;
+ final Iterator<Integer> selects = selection.iterator();
+ return new FileRecordIterator<T>() {
+ private long nextExpected = selects.hasNext() ? selects.next() :
-1;
+
+ @Override
+ public long returnedPosition() {
+ return thisIterator.returnedPosition();
+ }
+
+ @Override
+ public Path filePath() {
+ return thisIterator.filePath();
+ }
+
+ @Nullable
+ @Override
+ public T next() throws IOException {
+ while (true) {
+ if (nextExpected == -1) {
+ return null;
+ }
+ T next = thisIterator.next();
+ if (next == null) {
+ return null;
+ }
+ while (nextExpected != -1 && nextExpected <
returnedPosition()) {
+ nextExpected = selects.hasNext() ? selects.next() : -1;
+ }
+ if (nextExpected == returnedPosition()) {
+ return next;
+ }
+ }
+ }
+
+ @Override
+ public void releaseBatch() {
+ thisIterator.releaseBatch();
+ }
+ };
+ }
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/reader/FileRecordIteratorTest.java
b/paimon-common/src/test/java/org/apache/paimon/reader/FileRecordIteratorTest.java
new file mode 100644
index 0000000000..4fd0f44689
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/reader/FileRecordIteratorTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FileRecordIterator}. */
+public class FileRecordIteratorTest {
+
+ @Test
+ public void testSelection() throws IOException {
+ List<Long> values = Arrays.asList(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L,
9L);
+ FileRecordIterator<Long> iterator = createIterator(values);
+
+ RoaringBitmap32 selection = new RoaringBitmap32();
+ selection.add(1);
+ selection.add(3);
+ selection.add(5);
+ selection.add(7);
+
+ FileRecordIterator<Long> filteredIterator =
iterator.selection(selection);
+
+ List<Long> result = collectAll(filteredIterator);
+
+ assertThat(result).containsExactly(1L, 3L, 5L, 7L);
+ }
+
+ @Test
+ public void testSelectionWithEmptySelection() throws IOException {
+ List<Long> values = Arrays.asList(0L, 1L, 2L, 3L, 4L);
+ FileRecordIterator<Long> iterator = createIterator(values);
+
+ RoaringBitmap32 selection = new RoaringBitmap32();
+
+ FileRecordIterator<Long> filteredIterator =
iterator.selection(selection);
+
+ List<Long> result = collectAll(filteredIterator);
+
+ assertThat(result).isEmpty();
+ }
+
+ @Test
+ public void testSelectionWithAllSelected() throws IOException {
+ List<Long> values = Arrays.asList(0L, 1L, 2L, 3L, 4L);
+ FileRecordIterator<Long> iterator = createIterator(values);
+
+ RoaringBitmap32 selection = new RoaringBitmap32();
+ for (int i = 0; i < values.size(); i++) {
+ selection.add(i);
+ }
+
+ FileRecordIterator<Long> filteredIterator =
iterator.selection(selection);
+
+ List<Long> result = collectAll(filteredIterator);
+
+ assertThat(result).containsExactly(0L, 1L, 2L, 3L, 4L);
+ }
+
+ @Test
+ public void testSelectionWithFirstElement() throws IOException {
+ List<Long> values = Arrays.asList(0L, 1L, 2L, 3L, 4L);
+ FileRecordIterator<Long> iterator = createIterator(values);
+
+ RoaringBitmap32 selection = new RoaringBitmap32();
+ selection.add(0);
+
+ FileRecordIterator<Long> filteredIterator =
iterator.selection(selection);
+
+ List<Long> result = collectAll(filteredIterator);
+
+ assertThat(result).containsExactly(0L);
+ }
+
+ @Test
+ public void testSelectionWithLastElement() throws IOException {
+ List<Long> values = Arrays.asList(0L, 1L, 2L, 3L, 4L);
+ FileRecordIterator<Long> iterator = createIterator(values);
+
+ RoaringBitmap32 selection = new RoaringBitmap32();
+ selection.add(4);
+
+ FileRecordIterator<Long> filteredIterator =
iterator.selection(selection);
+
+ List<Long> result = collectAll(filteredIterator);
+
+ assertThat(result).containsExactly(4L);
+ }
+
+ @Test
+ public void testSelectionWithConsecutivePositions() throws IOException {
+ List<Long> values = Arrays.asList(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L,
9L);
+ FileRecordIterator<Long> iterator = createIterator(values);
+
+ RoaringBitmap32 selection = new RoaringBitmap32();
+ selection.add(2);
+ selection.add(3);
+ selection.add(4);
+ selection.add(5);
+
+ FileRecordIterator<Long> filteredIterator =
iterator.selection(selection);
+
+ List<Long> result = collectAll(filteredIterator);
+
+ assertThat(result).containsExactly(2L, 3L, 4L, 5L);
+ }
+
+ @Test
+ public void testSelectionWithSparsePositions() throws IOException {
+ List<Long> values = new ArrayList<>();
+ for (long i = 0; i < 100; i++) {
+ values.add(i);
+ }
+ FileRecordIterator<Long> iterator = createIterator(values);
+
+ RoaringBitmap32 selection = new RoaringBitmap32();
+ selection.add(0);
+ selection.add(10);
+ selection.add(50);
+ selection.add(99);
+
+ FileRecordIterator<Long> filteredIterator =
iterator.selection(selection);
+
+ List<Long> result = collectAll(filteredIterator);
+
+ assertThat(result).containsExactly(0L, 10L, 50L, 99L);
+ }
+
+ @Test
+ public void testSelectionWithOutOfRangePositions() throws IOException {
+ List<Long> values = Arrays.asList(0L, 1L, 2L, 3L, 4L);
+ FileRecordIterator<Long> iterator = createIterator(values);
+
+ RoaringBitmap32 selection = new RoaringBitmap32();
+ selection.add(1);
+ selection.add(3);
+ selection.add(10);
+ selection.add(20);
+
+ FileRecordIterator<Long> filteredIterator =
iterator.selection(selection);
+
+ List<Long> result = collectAll(filteredIterator);
+
+ assertThat(result).containsExactly(1L, 3L);
+ }
+
+ @Test
+ public void testSelectionPositionTracking() throws IOException {
+ List<Long> values = Arrays.asList(10L, 20L, 30L, 40L, 50L);
+ FileRecordIterator<Long> iterator = createIterator(values);
+
+ RoaringBitmap32 selection = new RoaringBitmap32();
+ selection.add(1);
+ selection.add(3);
+
+ FileRecordIterator<Long> filteredIterator =
iterator.selection(selection);
+
+ Long first = filteredIterator.next();
+ assertThat(first).isEqualTo(20L);
+ assertThat(filteredIterator.returnedPosition()).isEqualTo(1);
+
+ Long second = filteredIterator.next();
+ assertThat(second).isEqualTo(40L);
+ assertThat(filteredIterator.returnedPosition()).isEqualTo(3);
+
+ Long third = filteredIterator.next();
+ assertThat(third).isNull();
+ }
+
+ @Test
+ public void testSelectionFilePathPreserved() {
+ List<Long> values = Arrays.asList(0L, 1L, 2L);
+ FileRecordIterator<Long> iterator = createIterator(values);
+
+ RoaringBitmap32 selection = new RoaringBitmap32();
+ selection.add(1);
+
+ FileRecordIterator<Long> filteredIterator =
iterator.selection(selection);
+
+
assertThat(filteredIterator.filePath().toString()).isEqualTo("test-file.parquet");
+ }
+
+ private FileRecordIterator<Long> createIterator(List<Long> values) {
+ return new FileRecordIterator<Long>() {
+ private int position = -1;
+
+ @Override
+ public long returnedPosition() {
+ return position;
+ }
+
+ @Override
+ public Path filePath() {
+ return new Path("test-file.parquet");
+ }
+
+ @Nullable
+ @Override
+ public Long next() {
+ position++;
+ if (position >= values.size()) {
+ return null;
+ }
+ return values.get(position);
+ }
+
+ @Override
+ public void releaseBatch() {}
+ };
+ }
+
+ private List<Long> collectAll(FileRecordIterator<Long> iterator) throws
IOException {
+ List<Long> result = new ArrayList<>();
+ Long value;
+ while ((value = iterator.next()) != null) {
+ result.add(value);
+ }
+ return result;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index e7bb8b9571..3c24392a78 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -31,6 +31,7 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.utils.RoaringBitmap32;
import javax.annotation.Nullable;
@@ -326,6 +327,8 @@ public interface DataFileMeta {
DataFileMeta copy(byte[] newEmbeddedIndex);
+ RoaringBitmap32 toFileSelection(List<Long> indices);
+
static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
return fileMetas.stream()
.map(DataFileMeta::maxSequenceNumber)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
index 8f7c9d6dad..5f8a217b98 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
@@ -33,6 +33,7 @@ import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.ProjectedRow;
+import org.apache.paimon.utils.RoaringBitmap32;
import javax.annotation.Nullable;
@@ -52,6 +53,7 @@ public class DataFileRecordReader implements
FileRecordReader<InternalRow> {
@Nullable private final Long firstRowId;
private final long maxSequenceNumber;
private final Map<String, Integer> systemFields;
+ @Nullable private final RoaringBitmap32 selection;
public DataFileRecordReader(
RowType tableRowType,
@@ -79,6 +81,7 @@ public class DataFileRecordReader implements
FileRecordReader<InternalRow> {
this.firstRowId = firstRowId;
this.maxSequenceNumber = maxSequenceNumber;
this.systemFields = systemFields;
+ this.selection = context.selection();
}
@Nullable
@@ -144,6 +147,10 @@ public class DataFileRecordReader implements
FileRecordReader<InternalRow> {
iterator = iterator.transform(castedRow::replaceRow);
}
+ if (selection != null) {
+ iterator = iterator.selection(selection);
+ }
+
return iterator;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
index 88f1a740ed..f4df50a3a2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.utils.RoaringBitmap32;
import javax.annotation.Nullable;
@@ -453,6 +454,26 @@ public class PojoDataFileMeta implements DataFileMeta {
writeCols);
}
+ @Override
+ public RoaringBitmap32 toFileSelection(List<Long> indices) {
+ RoaringBitmap32 selection = null;
+ if (indices != null) {
+ if (firstRowId() == null) {
+ throw new IllegalStateException(
+ "firstRowId is null, can't convert to file selection");
+ }
+ selection = new RoaringBitmap32();
+ long start = firstRowId();
+ long end = start + rowCount();
+ for (long rowId : indices) {
+ if (rowId >= start && rowId < end) {
+ selection.add((int) (rowId - start));
+ }
+ }
+ }
+ return selection;
+ }
+
@Override
public boolean equals(Object o) {
if (o == this) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 2c69b2ce80..47ac87fd77 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -237,6 +237,12 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return this;
}
+ @Override
+ public FileStoreScan withRowIds(List<Long> indices) {
+ // do nothing by default
+ return this;
+ }
+
@Nullable
@Override
public Integer parallelism() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index 71380ba88a..d559ed2be0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -35,6 +35,8 @@ import
org.apache.paimon.table.source.DataEvolutionSplitGenerator;
import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.SnapshotManager;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -45,6 +47,7 @@ import java.util.stream.Collectors;
public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {
private boolean dropStats = false;
+ @Nullable private List<Long> indices;
public DataEvolutionFileStoreScan(
ManifestsReader manifestsReader,
@@ -82,6 +85,12 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
return this;
}
+ @Override
+ public FileStoreScan withRowIds(List<Long> indices) {
+ this.indices = indices;
+ return this;
+ }
+
@Override
protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
if (inputFilter == null) {
@@ -189,6 +198,28 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
/** Note: Keep this thread-safe. */
@Override
protected boolean filterByStats(ManifestEntry entry) {
- return true;
+ // If indices is null, all entries should be kept
+ if (this.indices == null) {
+ return true;
+ }
+
+ // If entry.firstRowId does not exist, keep the entry
+ Long firstRowId = entry.file().firstRowId();
+ if (firstRowId == null) {
+ return true;
+ }
+
+ // Check if any value in indices is in the range [firstRowId,
firstRowId + rowCount)
+ long rowCount = entry.file().rowCount();
+ long endRowId = firstRowId + rowCount;
+
+ for (Long index : this.indices) {
+ if (index >= firstRowId && index < endRowId) {
+ return true;
+ }
+ }
+
+ // No matching indices found, skip this entry
+ return false;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index f2f15dcbbd..9d7d753686 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -47,6 +47,7 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FormatReaderMapping;
import org.apache.paimon.utils.FormatReaderMapping.Builder;
+import org.apache.paimon.utils.RoaringBitmap32;
import javax.annotation.Nullable;
@@ -78,6 +79,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
private final FileStorePathFactory pathFactory;
private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
private final Function<Long, TableSchema> schemaFetcher;
+ @Nullable private List<Long> indices;
protected RowType readRowType;
@@ -122,6 +124,12 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
return this;
}
+ @Override
+ public SplitRead<InternalRow> withRowIds(@Nullable List<Long> indices) {
+ this.indices = indices;
+ return this;
+ }
+
@Override
public RecordReader<InternalRow> createReader(DataSplit split) throws
IOException {
List<DataFileMeta> files = split.dataFiles();
@@ -188,13 +196,15 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
long rowCount = fieldsFiles.get(0).rowCount();
long firstRowId = fieldsFiles.get(0).files().get(0).firstRowId();
- for (FieldBunch bunch : fieldsFiles) {
- checkArgument(
- bunch.rowCount() == rowCount,
- "All files in a field merge split should have the same row
count.");
- checkArgument(
- bunch.files().get(0).firstRowId() == firstRowId,
- "All files in a field merge split should have the same
first row id and could not be null.");
+ if (indices == null) {
+ for (FieldBunch bunch : fieldsFiles) {
+ checkArgument(
+ bunch.rowCount() == rowCount,
+ "All files in a field merge split should have the same
row count.");
+ checkArgument(
+ bunch.files().get(0).firstRowId() == firstRowId,
+ "All files in a field merge split should have the same
first row id and could not be null.");
+ }
}
// Init all we need to create a compound reader
@@ -311,9 +321,10 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
}
List<ReaderSupplier<InternalRow>> readerSuppliers = new ArrayList<>();
for (DataFileMeta file : bunch.files()) {
+ RoaringBitmap32 selection = file.toFileSelection(indices);
FormatReaderContext formatReaderContext =
new FormatReaderContext(
- fileIO, dataFilePathFactory.toPath(file),
file.fileSize(), null);
+ fileIO, dataFilePathFactory.toPath(file),
file.fileSize(), selection);
readerSuppliers.add(
() ->
new DataFileRecordReader(
@@ -338,9 +349,10 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
DataFilePathFactory dataFilePathFactory,
FormatReaderMapping formatReaderMapping)
throws IOException {
+ RoaringBitmap32 selection = file.toFileSelection(indices);
FormatReaderContext formatReaderContext =
new FormatReaderContext(
- fileIO, dataFilePathFactory.toPath(file),
file.fileSize(), null);
+ fileIO, dataFilePathFactory.toPath(file),
file.fileSize(), selection);
return new DataFileRecordReader(
schema.logicalRowType(),
formatReaderMapping.getReaderFactory(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 047b1c3f5d..eb52fb645f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -89,6 +89,8 @@ public interface FileStoreScan {
FileStoreScan keepStats();
+ FileStoreScan withRowIds(List<Long> indices);
+
@Nullable
Integer parallelism();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 5dc8522a8f..fe5f0df9e1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -84,6 +84,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
@Nullable private List<Predicate> filters;
@Nullable private TopN topN;
@Nullable private Integer limit;
+ @Nullable private List<Long> indices;
public RawFileSplitRead(
FileIO fileIO,
@@ -141,6 +142,12 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
return this;
}
+ @Override
+ public SplitRead<InternalRow> withRowIds(@Nullable List<Long> indices) {
+ this.indices = indices;
+ return this;
+ }
+
@Override
public RecordReader<InternalRow> createReader(DataSplit split) throws
IOException {
if (!split.beforeFiles().isEmpty()) {
@@ -251,6 +258,14 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
if (fileIndexResult instanceof BitmapIndexResult) {
selection = ((BitmapIndexResult) fileIndexResult).get();
}
+ if (indices != null) {
+ RoaringBitmap32 selectionRowIds = file.toFileSelection(indices);
+ if (selection == null) {
+ selection = selectionRowIds;
+ } else {
+ selection.and(selectionRowIds);
+ }
+ }
FormatReaderContext formatReaderContext =
new FormatReaderContext(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
index d646375ef6..bd23d09517 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
@@ -29,6 +29,7 @@ import org.apache.paimon.utils.IOFunction;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.List;
/**
* Read operation which provides {@link RecordReader} creation.
@@ -53,6 +54,10 @@ public interface SplitRead<T> {
return this;
}
+ default SplitRead<T> withRowIds(@Nullable List<Long> indices) {
+ return this;
+ }
+
/** Create a {@link RecordReader} from split. */
RecordReader<T> createReader(DataSplit split) throws IOException;
@@ -83,6 +88,12 @@ public interface SplitRead<T> {
return this;
}
+ @Override
+ public SplitRead<R> withRowIds(@Nullable List<Long> indices) {
+ read.withRowIds(indices);
+ return this;
+ }
+
@Override
public RecordReader<R> createReader(DataSplit split) throws
IOException {
return convertedFactory.apply(split);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index 95df280d9d..81ff015769 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -220,6 +220,11 @@ public class FormatReadBuilder implements ReadBuilder {
throw new UnsupportedOperationException("Format Table does not support
withShard.");
}
+ @Override
+ public ReadBuilder withRowIds(List<Long> indices) {
+ throw new UnsupportedOperationException("Format Table does not support
withRowIds.");
+ }
+
@Override
public StreamTableScan newStreamScan() {
throw new UnsupportedOperationException("Format Table does not support
stream scan.");
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
index fe73d3c251..1d1cb69279 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -27,6 +27,7 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.RowType;
import java.io.IOException;
+import java.util.List;
import java.util.Optional;
/** A {@link InnerTableRead} for data table. */
@@ -43,6 +44,8 @@ public abstract class AbstractDataTableRead implements
InnerTableRead {
public abstract void applyReadType(RowType readType);
+ public abstract void applyRowIds(List<Long> indices);
+
public abstract RecordReader<InternalRow> reader(Split split) throws
IOException;
@Override
@@ -79,6 +82,12 @@ public abstract class AbstractDataTableRead implements
InnerTableRead {
return this;
}
+ @Override
+ public InnerTableRead withRowIds(List<Long> indices) {
+ applyRowIds(indices);
+ return this;
+ }
+
@Override
public final RecordReader<InternalRow> createReader(Split split) throws
IOException {
RecordReader<InternalRow> reader = reader(split);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 19b9220278..cdbb5bc3c4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -171,6 +171,12 @@ abstract class AbstractDataTableScan implements
DataTableScan {
return this;
}
+ @Override
+ public InnerTableScan withRowIds(List<Long> indices) {
+ snapshotReader.withRowIds(indices);
+ return this;
+ }
+
public CoreOptions options() {
return options;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
index 8feb177db5..69ed61b0de 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
@@ -48,6 +48,7 @@ public final class AppendTableRead extends
AbstractDataTableRead {
private Predicate predicate = null;
private TopN topN = null;
private Integer limit = null;
+ @Nullable private List<Long> indices;
public AppendTableRead(
List<Function<SplitReadConfig, SplitReadProvider>>
providerFactories,
@@ -76,6 +77,7 @@ public final class AppendTableRead extends
AbstractDataTableRead {
read.withFilter(predicate);
read.withTopN(topN);
read.withLimit(limit);
+ read.withRowIds(indices);
}
@Override
@@ -84,6 +86,12 @@ public final class AppendTableRead extends
AbstractDataTableRead {
this.readType = readType;
}
+ @Override
+ public void applyRowIds(List<Long> indices) {
+ initialized().forEach(r -> r.withRowIds(indices));
+ this.indices = indices;
+ }
+
@Override
protected InnerTableRead innerWithFilter(Predicate predicate) {
initialized().forEach(r -> r.withFilter(predicate));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
index b4da78ef6f..937ab05786 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
@@ -72,4 +72,8 @@ public interface InnerTableRead extends TableRead {
default InnerTableRead withMetricRegistry(MetricRegistry registry) {
return this;
}
+
+ default InnerTableRead withRowIds(List<Long> indices) {
+ return this;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index d0015445b3..7df93865b4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -60,6 +60,10 @@ public interface InnerTableScan extends TableScan {
return this;
}
+ default InnerTableScan withRowIds(List<Long> indices) {
+ return this;
+ }
+
default InnerTableScan withBucket(int bucket) {
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index ed67e73380..00e6d7260b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -104,6 +104,11 @@ public final class KeyValueTableRead extends
AbstractDataTableRead {
this.readType = readType;
}
+ @Override
+ public void applyRowIds(List<Long> indices) {
+ throw new UnsupportedOperationException("Does not support row ids.");
+ }
+
@Override
public InnerTableRead forceKeepDelete() {
initialized().forEach(SplitRead::forceKeepDelete);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index 83dcb2e33e..4e13ca4e79 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -149,6 +149,14 @@ public interface ReadBuilder extends Serializable {
*/
ReadBuilder withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks);
+ /**
+ * Specify the row ids to be read. This is usually used to read specific
rows in data-evolution
+ * table.
+ *
+ * @param indices the row ids to be read
+ */
+ ReadBuilder withRowIds(List<Long> indices);
+
/** Delete stats in scan plan result. */
ReadBuilder dropStats();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 662ac19858..84cc95a79f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -29,6 +29,7 @@ import org.apache.paimon.utils.Filter;
import javax.annotation.Nullable;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -59,6 +60,7 @@ public class ReadBuilderImpl implements ReadBuilder {
private Filter<Integer> bucketFilter;
private @Nullable RowType readType;
+ private @Nullable List<Long> indices;
private boolean dropStats = false;
@@ -143,6 +145,12 @@ public class ReadBuilderImpl implements ReadBuilder {
return this;
}
+ @Override
+ public ReadBuilder withRowIds(List<Long> indices) {
+ this.indices = indices;
+ return this;
+ }
+
@Override
public ReadBuilder withBucket(int bucket) {
this.specifiedBucket = bucket;
@@ -182,7 +190,10 @@ public class ReadBuilderImpl implements ReadBuilder {
// `filter` may contains partition related predicate, but
`partitionFilter` will overwrite
// it if `partitionFilter` is not null. So we must avoid to put part
of partition filter in
// `filter`, another part in `partitionFilter`
-
scan.withFilter(filter).withReadType(readType).withPartitionFilter(partitionFilter);
+ scan.withFilter(filter)
+ .withReadType(readType)
+ .withPartitionFilter(partitionFilter)
+ .withRowIds(indices);
checkState(
bucketFilter == null || shardIndexOfThisSubtask == null,
"Bucket filter and shard configuration cannot be used
together. "
@@ -220,6 +231,9 @@ public class ReadBuilderImpl implements ReadBuilder {
if (limit != null) {
read.withLimit(limit);
}
+ if (indices != null) {
+ read.withRowIds(indices);
+ }
return read;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index ed80f3f92a..2e9b43c9fc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -110,6 +110,8 @@ public interface SnapshotReader {
SnapshotReader withMetricRegistry(MetricRegistry registry);
+ SnapshotReader withRowIds(List<Long> indices);
+
/** Get splits plan from snapshot. */
Plan read();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 038f4fec18..df3435589d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -306,6 +306,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
return this;
}
+ @Override
+ public SnapshotReader withRowIds(List<Long> indices) {
+ scan.withRowIds(indices);
+ return this;
+ }
+
@Override
public SnapshotReader withDataFileNameFilter(Filter<String>
fileNameFilter) {
scan.withDataFileNameFilter(fileNameFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
index 5f227b6a66..9e03c46180 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
@@ -29,6 +29,8 @@ import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+
/** A {@link SplitReadProvider} to create {@link DataEvolutionSplitRead}. */
public class DataEvolutionSplitReadProvider implements SplitReadProvider {
@@ -54,6 +56,9 @@ public class DataEvolutionSplitReadProvider implements
SplitReadProvider {
Set<Long> firstRowIds = new HashSet<>();
for (DataFileMeta file : files) {
+ if (isBlobFile(file.fileName())) {
+ return true;
+ }
Long current = file.firstRowId();
if (current == null
|| !file.fileSource().isPresent()
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 3fd6a8d8af..018b6add66 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -425,6 +425,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public SnapshotReader withRowIds(List<Long> indices) {
+ wrapped.withRowIds(indices);
+ return this;
+ }
+
@Override
public Plan read() {
return wrapped.read();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index fad49d83fd..539f5a0d6c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -28,15 +28,21 @@ import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.DataEvolutionSplitRead;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;
import org.junit.jupiter.api.Test;
+import javax.annotation.Nonnull;
+
import java.io.ByteArrayInputStream;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -127,6 +133,53 @@ public class BlobTableTest extends TableTestBase {
assertThat(integer.get()).isEqualTo(200);
}
+ @Test
+ public void testRowIdPushDown() throws Exception {
+ createTableDefault();
+ writeDataDefault(
+ new Iterable<InternalRow>() {
+ @Nonnull
+ @Override
+ public Iterator<InternalRow> iterator() {
+ return new Iterator<InternalRow>() {
+ int i = 0;
+
+ @Override
+ public boolean hasNext() {
+ return i < 200;
+ }
+
+ @Override
+ public InternalRow next() {
+ i++;
+ return (i - 1) == 100
+ ? GenericRow.of(
+ i,
+
BinaryString.fromString("nice"),
+ new BlobData(
+ "This is the specified
message".getBytes()))
+ : dataDefault(0, 0);
+ }
+ };
+ }
+ });
+
+ Table table = getTableDefault();
+ ReadBuilder readBuilder =
+
table.newReadBuilder().withRowIds(Collections.singletonList(100L));
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+
+ AtomicInteger i = new AtomicInteger(0);
+ reader.forEachRemaining(
+ row -> {
+ i.getAndIncrement();
+ assertThat(row.getBlob(2).toData())
+ .isEqualTo("This is the specified
message".getBytes());
+ });
+ assertThat(i.get()).isEqualTo(1);
+ }
+
protected Schema schemaDefault() {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 6ddc9b410d..65cd919e63 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -35,6 +35,7 @@ import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -427,6 +428,178 @@ public class DataEvolutionTableTest extends TableTestBase
{
assertThat(readBuilder.newScan().plan().splits().isEmpty()).isTrue();
}
+ @Test
+ public void testWithRowIds() throws Exception {
+ createTableDefault();
+ Schema schema = schemaDefault();
+ BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+
+ // Write first batch of data with firstRowId = 0
+ RowType writeType0 = schema.rowType().project(Arrays.asList("f0",
"f1"));
+ try (BatchTableWrite write0 =
builder.newWrite().withWriteType(writeType0)) {
+ write0.write(GenericRow.of(1, BinaryString.fromString("a")));
+ write0.write(GenericRow.of(2, BinaryString.fromString("b")));
+
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write0.prepareCommit();
+ setFirstRowId(commitables, 0L);
+ commit.commit(commitables);
+ }
+
+ // Write second batch of data with firstRowId = 2
+ try (BatchTableWrite write0 =
builder.newWrite().withWriteType(writeType0)) {
+ write0.write(GenericRow.of(3, BinaryString.fromString("c")));
+ write0.write(GenericRow.of(4, BinaryString.fromString("d")));
+
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write0.prepareCommit();
+ setFirstRowId(commitables, 2L);
+ commit.commit(commitables);
+ }
+
+ // Write third batch of data with firstRowId = 4
+ try (BatchTableWrite write0 =
builder.newWrite().withWriteType(writeType0)) {
+ write0.write(GenericRow.of(5, BinaryString.fromString("e")));
+ write0.write(GenericRow.of(6, BinaryString.fromString("f")));
+
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write0.prepareCommit();
+ setFirstRowId(commitables, 4L);
+ commit.commit(commitables);
+ }
+
+ ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+
+ // Test 1: Filter by row IDs that exist in the first file (0, 1)
+ List<Long> rowIds1 = Arrays.asList(0L, 1L);
+ List<Split> splits1 =
readBuilder.withRowIds(rowIds1).newScan().plan().splits();
+ assertThat(splits1.size())
+ .isEqualTo(1); // Should return one split containing the first
file
+
+ // Verify the split contains only the first file (firstRowId=0,
rowCount=2)
+ DataSplit dataSplit1 = (DataSplit) splits1.get(0);
+ assertThat(dataSplit1.dataFiles().size()).isEqualTo(1);
+ DataFileMeta file1 = dataSplit1.dataFiles().get(0);
+ assertThat(file1.firstRowId()).isEqualTo(0L);
+ assertThat(file1.rowCount()).isEqualTo(2L);
+
+ // Test 2: Filter by row IDs that exist in the second file (2, 3)
+ List<Long> rowIds2 = Arrays.asList(2L, 3L);
+ List<Split> splits2 =
readBuilder.withRowIds(rowIds2).newScan().plan().splits();
+ assertThat(splits2.size())
+ .isEqualTo(1); // Should return one split containing the
second file
+
+ // Verify the split contains only the second file (firstRowId=2,
rowCount=2)
+ DataSplit dataSplit2 = (DataSplit) splits2.get(0);
+ assertThat(dataSplit2.dataFiles().size()).isEqualTo(1);
+ DataFileMeta file2 = dataSplit2.dataFiles().get(0);
+ assertThat(file2.firstRowId()).isEqualTo(2L);
+ assertThat(file2.rowCount()).isEqualTo(2L);
+
+ // Test 3: Filter by row IDs that exist in the third file (4, 5)
+ List<Long> rowIds3 = Arrays.asList(4L, 5L);
+ List<Split> splits3 =
readBuilder.withRowIds(rowIds3).newScan().plan().splits();
+ assertThat(splits3.size())
+ .isEqualTo(1); // Should return one split containing the third
file
+
+ // Verify the split contains only the third file (firstRowId=4,
rowCount=2)
+ DataSplit dataSplit3 = (DataSplit) splits3.get(0);
+ assertThat(dataSplit3.dataFiles().size()).isEqualTo(1);
+ DataFileMeta file3 = dataSplit3.dataFiles().get(0);
+ assertThat(file3.firstRowId()).isEqualTo(4L);
+ assertThat(file3.rowCount()).isEqualTo(2L);
+
+ // Test 4: Filter by row IDs that span multiple files (1, 2, 4)
+ List<Long> rowIds4 = Arrays.asList(0L, 1L, 4L);
+ List<Split> splits4 =
readBuilder.withRowIds(rowIds4).newScan().plan().splits();
+ assertThat(splits4.size())
+ .isEqualTo(1); // Should return one split containing all
matching files
+
+ // Verify the split contains all three files (firstRowId=0,2,4)
+ DataSplit dataSplit4 = (DataSplit) splits4.get(0);
+ assertThat(dataSplit4.dataFiles().size()).isEqualTo(2);
+
+ // Check that all three files are present with correct firstRowIds
+ List<Long> actualFirstRowIds =
+ dataSplit4.dataFiles().stream()
+ .map(DataFileMeta::firstRowId)
+ .sorted()
+ .collect(Collectors.toList());
+ assertThat(actualFirstRowIds.size()).isEqualTo(2);
+ // Verify we have the expected firstRowIds: 0, 4
+ boolean has0 = actualFirstRowIds.stream().anyMatch(id -> id == 0L);
+ boolean has4 = actualFirstRowIds.stream().anyMatch(id -> id == 4L);
+ assertThat(has0).isTrue();
+ assertThat(has4).isTrue();
+
+ // Verify each file has the correct row count
+ for (DataFileMeta file : dataSplit4.dataFiles()) {
+ assertThat(file.rowCount()).isEqualTo(2L);
+ }
+
+ // Test 5: Filter by row IDs that don't exist (10, 11)
+ List<Long> rowIds5 = Arrays.asList(10L, 11L);
+ List<Split> splits5 =
readBuilder.withRowIds(rowIds5).newScan().plan().splits();
+ assertThat(splits5.size()).isEqualTo(0); // Should return no files
+
+ // Test 6: Filter by null indices (should return all files)
+ List<Split> splits6 =
readBuilder.withRowIds(null).newScan().plan().splits();
+ assertThat(splits6.size()).isEqualTo(1); // Should return one split
containing all files
+
+ // Verify the split contains all three files
+ DataSplit dataSplit6 = (DataSplit) splits6.get(0);
+ assertThat(dataSplit6.dataFiles().size()).isEqualTo(3);
+
+ // Check that all three files are present with correct firstRowIds
+ List<Long> allFirstRowIds =
+ dataSplit6.dataFiles().stream()
+ .map(DataFileMeta::firstRowId)
+ .sorted()
+ .collect(Collectors.toList());
+ assertThat(allFirstRowIds.size()).isEqualTo(3);
+ // Verify we have the expected firstRowIds: 0, 2, 4
+ boolean has0All = allFirstRowIds.stream().anyMatch(id -> id == 0L);
+ boolean has2All = allFirstRowIds.stream().anyMatch(id -> id == 2L);
+ boolean has4All = allFirstRowIds.stream().anyMatch(id -> id == 4L);
+ assertThat(has0All).isTrue();
+ assertThat(has2All).isTrue();
+ assertThat(has4All).isTrue();
+
+ // Test 7: Filter by empty indices (should return no files)
+ List<Split> splits7 =
+
readBuilder.withRowIds(Collections.emptyList()).newScan().plan().splits();
+ assertThat(splits7.size()).isEqualTo(0); // Should return no files
+
+ // Test 8: Filter by row IDs that partially exist (0, 1, 10)
+ List<Long> rowIds8 = Arrays.asList(0L, 1L, 10L);
+ List<Split> splits8 =
readBuilder.withRowIds(rowIds8).newScan().plan().splits();
+ assertThat(splits8.size())
+ .isEqualTo(1); // Should return one split containing the first
file
+
+ // Verify the split contains only the first file (firstRowId=0)
+ DataSplit dataSplit8 = (DataSplit) splits8.get(0);
+ assertThat(dataSplit8.dataFiles().size()).isEqualTo(1);
+ DataFileMeta file8 = dataSplit8.dataFiles().get(0);
+ assertThat(file8.firstRowId()).isEqualTo(0L);
+ assertThat(file8.rowCount()).isEqualTo(2L);
+
+ List<Long> rowIds9 = Arrays.asList(0L, 2L);
+ List<Split> splits9 =
readBuilder.withRowIds(rowIds9).newScan().plan().splits();
+
+ // Verify the actual data by reading from the filtered splits
+ // Note: withRowIds filters at the file level, so we get all rows from
matching files
+ RecordReader<InternalRow> reader9 =
readBuilder.newRead().createReader(splits9);
+ AtomicInteger i = new AtomicInteger(0);
+ reader9.forEachRemaining(
+ row -> {
+ assertThat(row.getInt(0)).isEqualTo(1 + i.get() * 2);
+ assertThat(row.getString(1).toString())
+ .isEqualTo(new String(new char[] {(char) ('a' +
i.get() * 2)}));
+ i.getAndIncrement();
+ });
+ assertThat(i.get()).isEqualTo(2);
+ }
+
protected Schema schemaDefault() {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
index 7e44afe6fb..9287e931aa 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
@@ -100,6 +100,8 @@ public class DataEvolutionSplitReadProviderTest {
when(file1.firstRowId()).thenReturn(1L);
when(file2.firstRowId()).thenReturn(null);
when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
+ when(file1.fileName()).thenReturn("test1.parquet");
+ when(file2.fileName()).thenReturn("test2.parquet");
assertThat(provider.match(split, false)).isFalse();
}
@@ -113,6 +115,8 @@ public class DataEvolutionSplitReadProviderTest {
when(file1.firstRowId()).thenReturn(1L);
when(file2.firstRowId()).thenReturn(2L);
when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
+ when(file1.fileName()).thenReturn("test1.parquet");
+ when(file2.fileName()).thenReturn("test2.parquet");
assertThat(provider.match(split, false)).isFalse();
}
@@ -128,6 +132,8 @@ public class DataEvolutionSplitReadProviderTest {
when(file1.firstRowId()).thenReturn(100L);
when(file2.firstRowId()).thenReturn(100L);
when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
+ when(file1.fileName()).thenReturn("test1.parquet");
+ when(file2.fileName()).thenReturn("test2.parquet");
// The forceKeepDelete parameter is not used in match, so test both
values
assertThat(provider.match(split, true)).isTrue();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
index bbc0bcda17..11d6f1472e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
@@ -33,6 +33,7 @@ import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowType;
import java.io.IOException;
+import java.util.List;
import static org.apache.paimon.table.source.KeyValueTableRead.unwrap;
@@ -54,6 +55,11 @@ public class LookupCompactDiffRead extends
AbstractDataTableRead {
incrementalDiffRead.withReadType(readType);
}
+ @Override
+ public void applyRowIds(List<Long> indices) {
+ throw new UnsupportedOperationException("Does not support row ids.");
+ }
+
@Override
public RecordReader<InternalRow> reader(Split split) throws IOException {
DataSplit dataSplit = (DataSplit) split;
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
index 46f8be153e..eb860cbb17 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -43,6 +43,7 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
private final Path filePath;
private final long[] blobLengths;
private final long[] blobOffsets;
+ private final int[] returnedPositions;
private boolean returned;
@@ -74,8 +75,10 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
offset += blobLengths[i];
}
+ int[] returnedPositions = null;
if (selection != null) {
int cardinality = (int) selection.getCardinality();
+ returnedPositions = new int[cardinality];
long[] newLengths = new long[cardinality];
long[] newOffsets = new long[cardinality];
Iterator<Integer> iterator = selection.iterator();
@@ -83,11 +86,13 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
Integer next = iterator.next();
newLengths[i] = blobLengths[next];
newOffsets[i] = blobOffsets[next];
+ returnedPositions[i] = next;
}
blobLengths = newLengths;
blobOffsets = newOffsets;
}
+ this.returnedPositions = returnedPositions;
this.blobLengths = blobLengths;
this.blobOffsets = blobOffsets;
}
@@ -107,7 +112,9 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
@Override
public long returnedPosition() {
- return currentPosition;
+ return returnedPositions == null
+ ? currentPosition
+ : returnedPositions[currentPosition - 1];
}
@Override