This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 12cc41ae9a [core] Fix row id push down: blob file may not continous 
(#6550)
12cc41ae9a is described below

commit 12cc41ae9af05aa9a8f27636f5dc0485784c7e9b
Author: YeJunHao <[email protected]>
AuthorDate: Fri Nov 7 08:33:51 2025 +0800

    [core] Fix row id push down: blob file may not continous (#6550)
---
 .../paimon/operation/DataEvolutionSplitRead.java   | 59 +++++++++++++++-------
 .../paimon/operation/DataEvolutionReadTest.java    | 21 +++++++-
 2 files changed, 61 insertions(+), 19 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index c0e54af576..47a247be48 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -374,6 +374,14 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
     @VisibleForTesting
     public static List<FieldBunch> splitFieldBunches(
             List<DataFileMeta> needMergeFiles, Function<DataFileMeta, Integer> 
blobFileToFieldId) {
+        return splitFieldBunches(needMergeFiles, blobFileToFieldId, false);
+    }
+
+    @VisibleForTesting
+    public static List<FieldBunch> splitFieldBunches(
+            List<DataFileMeta> needMergeFiles,
+            Function<DataFileMeta, Integer> blobFileToFieldId,
+            boolean rowIdPushDown) {
         List<FieldBunch> fieldsFiles = new ArrayList<>();
         Map<Integer, BlobBunch> blobBunchMap = new HashMap<>();
         long rowCount = -1;
@@ -382,7 +390,8 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                 int fieldId = blobFileToFieldId.apply(file);
                 final long expectedRowCount = rowCount;
                 blobBunchMap
-                        .computeIfAbsent(fieldId, key -> new 
BlobBunch(expectedRowCount))
+                        .computeIfAbsent(
+                                fieldId, key -> new 
BlobBunch(expectedRowCount, rowIdPushDown))
                         .add(file);
             } else {
                 // Normal file, just add it to the current merge split
@@ -426,16 +435,18 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
 
         final List<DataFileMeta> files;
         final long expectedRowCount;
+        final boolean rowIdPushDown;
 
         long latestFistRowId = -1;
         long expectedNextFirstRowId = -1;
         long latestMaxSequenceNumber = -1;
         long rowCount;
 
-        BlobBunch(long expectedRowCount) {
+        BlobBunch(long expectedRowCount, boolean rowIdPushDown) {
             this.files = new ArrayList<>();
             this.rowCount = 0;
             this.expectedRowCount = expectedRowCount;
+            this.rowIdPushDown = rowIdPushDown;
         }
 
         void add(DataFileMeta file) {
@@ -452,24 +463,36 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
             }
             if (!files.isEmpty()) {
                 long firstRowId = file.firstRowId();
-                if (firstRowId < expectedNextFirstRowId) {
+                if (rowIdPushDown) {
+                    if (firstRowId < expectedNextFirstRowId) {
+                        if (file.maxSequenceNumber() > 
latestMaxSequenceNumber) {
+                            files.remove(files.size() - 1);
+                        } else {
+                            return;
+                        }
+                    }
+                } else {
+                    if (firstRowId < expectedNextFirstRowId) {
+                        checkArgument(
+                                file.maxSequenceNumber() < 
latestMaxSequenceNumber,
+                                "Blob file with overlapping row id should have 
decreasing sequence number.");
+                        return;
+                    } else if (firstRowId > expectedNextFirstRowId) {
+                        throw new IllegalArgumentException(
+                                "Blob file first row id should be continuous, 
expect "
+                                        + expectedNextFirstRowId
+                                        + " but got "
+                                        + firstRowId);
+                    }
+                }
+                if (!files.isEmpty()) {
                     checkArgument(
-                            file.maxSequenceNumber() < latestMaxSequenceNumber,
-                            "Blob file with overlapping row id should have 
decreasing sequence number.");
-                    return;
-                } else if (firstRowId > expectedNextFirstRowId) {
-                    throw new IllegalArgumentException(
-                            "Blob file first row id should be continuous, 
expect "
-                                    + expectedNextFirstRowId
-                                    + " but got "
-                                    + firstRowId);
+                            file.schemaId() == files.get(0).schemaId(),
+                            "All files in a blob bunch should have the same 
schema id.");
+                    checkArgument(
+                            file.writeCols().equals(files.get(0).writeCols()),
+                            "All files in a blob bunch should have the same 
write columns.");
                 }
-                checkArgument(
-                        file.schemaId() == files.get(0).schemaId(),
-                        "All files in a blob bunch should have the same schema 
id.");
-                checkArgument(
-                        file.writeCols().equals(files.get(0).writeCols()),
-                        "All files in a blob bunch should have the same write 
columns.");
             }
             files.add(file);
             rowCount += file.rowCount();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
index 91c6285fa6..6b9dd29b59 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
@@ -35,6 +35,7 @@ import java.util.List;
 
 import static 
org.apache.paimon.operation.DataEvolutionSplitRead.splitFieldBunches;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link BlobBunch}. */
@@ -44,7 +45,7 @@ public class DataEvolutionReadTest {
 
     @BeforeEach
     public void setUp() {
-        blobBunch = new BlobBunch(Long.MAX_VALUE);
+        blobBunch = new BlobBunch(Long.MAX_VALUE, false);
     }
 
     @Test
@@ -319,6 +320,24 @@ public class DataEvolutionReadTest {
                 writeCols);
     }
 
+    @Test
+    public void testRowIdPushDown() {
+        BlobBunch blobBunch = new BlobBunch(Long.MAX_VALUE, true);
+        DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
+        DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1);
+        blobBunch.add(blobEntry1);
+        BlobBunch finalBlobBunch = blobBunch;
+        DataFileMeta finalBlobEntry = blobEntry2;
+        assertThatCode(() -> 
finalBlobBunch.add(finalBlobEntry)).doesNotThrowAnyException();
+
+        blobBunch = new BlobBunch(Long.MAX_VALUE, true);
+        blobEntry1 = createBlobFile("blob1", 0, 100, 1);
+        blobEntry2 = createBlobFile("blob2", 50, 200, 2);
+        blobBunch.add(blobEntry1);
+        blobBunch.add(blobEntry2);
+        assertThat(blobBunch.files).containsExactlyInAnyOrder(blobEntry2);
+    }
+
     /** Creates a normal (non-blob) file for testing. */
     private DataFileMeta createNormalFile(
             String fileName,

Reply via email to