This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c5f2decec [flink][spark] fix incorrect first_row_id range in 
DataEvolution MergeInto (#7790)
1c5f2decec is described below

commit 1c5f2dececffeec36f680e8d72ca375550d26ee7
Author: Faiz <[email protected]>
AuthorDate: Sat May 9 14:14:06 2026 +0800

    [flink][spark] fix incorrect first_row_id range in DataEvolution MergeInto 
(#7790)
    
    Current first row id check in DataEvolutionPartialWriter maybe incorrect
    because of special files i.e. Blob Files and Vector FIles, which may
    cause:
    ```text
    java.lang.AssertionError: assertion failed: Number of written records 2419 
does not match expected number 244 for first row ID 19352.
    ```
    
    This is because the blob file's record count override the normal file's
    record count:
    <img width="2244" height="280" alt="image"
    
src="https://github.com/user-attachments/assets/1338caea-46ee-4310-96e5-57c8336dc6c6";
    />
    
    We should filter out special files when calculating first_row_id to
    record_count mapping
---
 .../DataEvolutionPartialWriteOperator.java         |  8 +++-
 .../action/DataEvolutionMergeIntoActionITCase.java | 55 ++++++++++++++++++++++
 .../spark/commands/DataEvolutionPaimonWriter.scala |  3 ++
 .../MergeIntoPaimonDataEvolutionTable.scala        |  8 +++-
 .../org/apache/paimon/spark/sql/BlobTestBase.scala | 55 ++++++++++++++++++++++
 5 files changed, 126 insertions(+), 3 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
index 8b7053c9d2..278d58306c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.dataevolution;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.sink.Committable;
@@ -56,6 +57,7 @@ import java.util.TreeSet;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static org.apache.paimon.types.VectorType.isVectorStoreFile;
 
 /**
  * The Flink Batch Operator to process sorted new rows for data-evolution 
partial write. It assumes
@@ -90,7 +92,8 @@ public class DataEvolutionPartialWriteOperator
     private transient Writer writer;
 
     public DataEvolutionPartialWriteOperator(FileStoreTable table, RowType 
dataType) {
-        this.table = table;
+        this.table =
+                
table.copy(Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999 
G"));
         List<String> fieldNames =
                 dataType.getFieldNames().stream()
                         .filter(name -> 
!SpecialFields.ROW_ID.name().equals(name))
@@ -116,7 +119,8 @@ public class DataEvolutionPartialWriteOperator
                         .withManifestEntryFilter(
                                 entry ->
                                         entry.file().firstRowId() != null
-                                                && 
!isBlobFile(entry.file().fileName()))
+                                                && 
!isBlobFile(entry.file().fileName())
+                                                && 
!isVectorStoreFile(entry.file().fileName()))
                         .plan()
                         .files();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
index 102cd78e8b..96edb8d75a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
@@ -721,6 +721,61 @@ public class DataEvolutionMergeIntoActionITCase extends 
ActionITCaseBase {
                 "Expected error about raw-data BLOB column but got: " + 
t.getMessage());
     }
 
+    @Test
+    public void testUpdateNonBlobColumnOnRawBlobTableWithSplitFiles() throws 
Exception {
+        sEnv.executeSql(
+                buildDdl(
+                        "RAW_BLOB_SPLIT_T",
+                        Arrays.asList("id INT", "name STRING", "picture 
BYTES"),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        new HashMap<String, String>() {
+                            {
+                                put(ROW_TRACKING_ENABLED.key(), "true");
+                                put(DATA_EVOLUTION_ENABLED.key(), "true");
+                                put("blob-field", "picture");
+                                put(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), 
"1 b");
+                            }
+                        }));
+        insertInto(
+                "RAW_BLOB_SPLIT_T",
+                "(1, 'name1', X'48656C6C6F')",
+                "(2, 'name2', X'5945')",
+                "(3, 'name3', X'414243')");
+        testBatchRead(
+                "SELECT COUNT(*) FROM `RAW_BLOB_SPLIT_T$files` "
+                        + "WHERE file_path NOT LIKE '%.blob'",
+                Collections.singletonList(changelogRow("+I", 1L)));
+        testBatchRead(
+                "SELECT COUNT(*) > 1 FROM `RAW_BLOB_SPLIT_T$files` "
+                        + "WHERE file_path LIKE '%.blob'",
+                Collections.singletonList(changelogRow("+I", true)));
+
+        sEnv.executeSql(
+                buildDdl(
+                        "RAW_BLOB_SPLIT_S",
+                        Arrays.asList("id INT", "name STRING"),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyMap()));
+        insertInto("RAW_BLOB_SPLIT_S", "(1, 'updated_name1')");
+
+        builder(warehouse, database, "RAW_BLOB_SPLIT_T")
+                .withMergeCondition("RAW_BLOB_SPLIT_T.id=RAW_BLOB_SPLIT_S.id")
+                
.withMatchedUpdateSet("RAW_BLOB_SPLIT_T.name=RAW_BLOB_SPLIT_S.name")
+                .withSourceTable("RAW_BLOB_SPLIT_S")
+                .withSinkParallelism(1)
+                .build()
+                .run();
+
+        List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1, "updated_name1"),
+                        changelogRow("+I", 2, "name2"),
+                        changelogRow("+I", 3, "name3"));
+        testBatchRead("SELECT id, name FROM RAW_BLOB_SPLIT_T ORDER BY id", 
expected);
+    }
+
     @Test
     public void testUpdateNonBlobColumnOnDescriptorBlobTableSucceeds() throws 
Exception {
         // Create a table with descriptor BLOB column.
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
index e2122bd80a..2b78333223 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
@@ -20,12 +20,14 @@ package org.apache.paimon.spark.commands
 
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.data.BinaryRow
+import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile
 import org.apache.paimon.spark.write.{DataEvolutionTableDataWrite, 
WriteHelper, WriteTaskResult}
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink._
 import org.apache.paimon.table.source.DataSplit
 import org.apache.paimon.types.DataType
 import org.apache.paimon.types.DataTypeRoot.BLOB
+import org.apache.paimon.types.VectorType.isVectorStoreFile
 
 import org.apache.spark.sql._
 
@@ -44,6 +46,7 @@ case class DataEvolutionPaimonWriter(paimonTable: 
FileStoreTable, dataSplits: Se
         split
           .dataFiles()
           .asScala
+          .filter(file => !isBlobFile(file.fileName()) && 
!isVectorStoreFile(file.fileName()))
           .foreach(
             file =>
               firstRowIdToPartitionMap
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index 492d64bbf5..6920b44015 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -32,6 +32,7 @@ import 
org.apache.paimon.spark.util.ScanPlanHelper.createNewScanPlan
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
 import org.apache.paimon.table.source.DataSplit
+import org.apache.paimon.types.VectorType.isVectorStoreFile
 
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
 import org.apache.spark.sql.PaimonUtils._
@@ -150,7 +151,12 @@ case class MergeIntoPaimonDataEvolutionTable(
 
     val firstRowIds: immutable.IndexedSeq[Long] = tableSplits
       .flatMap(_.dataFiles().asScala)
-      .filter(file => file.firstRowId() != null && 
!isBlobFile(file.fileName()))
+      .filter {
+        file =>
+          file.firstRowId() != null &&
+          !isBlobFile(file.fileName()) &&
+          !isVectorStoreFile(file.fileName())
+      }
       .map(file => file.firstRowId().asInstanceOf[Long])
       .distinct
       .sorted
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index e95b63785d..2ff6eb308d 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -339,6 +339,61 @@ class BlobTestBase extends PaimonSparkTestBase {
     }
   }
 
+  test("Blob: merge-into updates non-blob column on raw blob table with split 
blob files") {
+    withTable("s", "t") {
+      sql(
+        "CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES " +
+          "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', " +
+          "'blob-field'='picture', 'blob.target-file-size'='1 b')")
+      sql(
+        "INSERT INTO t VALUES " +
+          "(1, 'name1', X'48656C6C6F'), " +
+          "(2, 'name2', X'5945'), " +
+          "(3, 'name3', X'414243')")
+
+      sql("CREATE TABLE s (id INT, name STRING)")
+      sql("INSERT INTO s VALUES (1, 'updated_name1')")
+
+      sql("""
+            |MERGE INTO t
+            |USING s
+            |ON t.id = s.id
+            |WHEN MATCHED THEN UPDATE SET t.name = s.name
+            |""".stripMargin)
+
+      checkAnswer(
+        sql("SELECT id, name FROM t ORDER BY id"),
+        Seq(Row(1, "updated_name1"), Row(2, "name2"), Row(3, "name3"))
+      )
+    }
+  }
+
+  test("Blob: self merge reads raw blob column to update non-blob column") {
+    withTable("t") {
+      sql(
+        "CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES " +
+          "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', " +
+          "'blob-field'='picture', 'blob.target-file-size'='1 b')")
+      sql(
+        "INSERT INTO t VALUES " +
+          "(1, 'name1', X'48656C6C6F'), " +
+          "(2, 'name2', X'5945'), " +
+          "(3, 'name3', X'414243')")
+
+      sql("""
+            |MERGE INTO t
+            |USING t AS source
+            |ON t._ROW_ID = source._ROW_ID
+            |WHEN MATCHED THEN UPDATE SET t.name = CAST(length(source.picture) 
AS STRING)
+            |""".stripMargin)
+
+      checkAnswer(
+        sql("SELECT id, name FROM t ORDER BY id"),
+        Seq(Row(1, "5"), Row(2, "2"), Row(3, "3"))
+      )
+    }
+  }
+
   test("Blob: merge-into updates non-blob column on descriptor blob table") {
     withTable("s", "t") {
       sql(

Reply via email to