This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1c5f2decec [flink][spark] fix incorrect first_row_id range in
DataEvolution MergeInto (#7790)
1c5f2decec is described below
commit 1c5f2dececffeec36f680e8d72ca375550d26ee7
Author: Faiz <[email protected]>
AuthorDate: Sat May 9 14:14:06 2026 +0800
[flink][spark] fix incorrect first_row_id range in DataEvolution MergeInto
(#7790)
Current first row id check in DataEvolutionPartialWriter maybe incorrect
because of special files i.e. Blob Files and Vector FIles, which may
cause:
```text
java.lang.AssertionError: assertion failed: Number of written records 2419
does not match expected number 244 for first row ID 19352.
```
This is because the blob file's record count override the normal file's
record count:
<img width="2244" height="280" alt="image"
src="https://github.com/user-attachments/assets/1338caea-46ee-4310-96e5-57c8336dc6c6"
/>
We should filter out special files when calculating first_row_id to
record_count mapping
---
.../DataEvolutionPartialWriteOperator.java | 8 +++-
.../action/DataEvolutionMergeIntoActionITCase.java | 55 ++++++++++++++++++++++
.../spark/commands/DataEvolutionPaimonWriter.scala | 3 ++
.../MergeIntoPaimonDataEvolutionTable.scala | 8 +++-
.../org/apache/paimon/spark/sql/BlobTestBase.scala | 55 ++++++++++++++++++++++
5 files changed, 126 insertions(+), 3 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
index 8b7053c9d2..278d58306c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.dataevolution;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.Committable;
@@ -56,6 +57,7 @@ import java.util.TreeSet;
import java.util.stream.Collectors;
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static org.apache.paimon.types.VectorType.isVectorStoreFile;
/**
* The Flink Batch Operator to process sorted new rows for data-evolution
partial write. It assumes
@@ -90,7 +92,8 @@ public class DataEvolutionPartialWriteOperator
private transient Writer writer;
public DataEvolutionPartialWriteOperator(FileStoreTable table, RowType
dataType) {
- this.table = table;
+ this.table =
+
table.copy(Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999
G"));
List<String> fieldNames =
dataType.getFieldNames().stream()
.filter(name ->
!SpecialFields.ROW_ID.name().equals(name))
@@ -116,7 +119,8 @@ public class DataEvolutionPartialWriteOperator
.withManifestEntryFilter(
entry ->
entry.file().firstRowId() != null
- &&
!isBlobFile(entry.file().fileName()))
+ &&
!isBlobFile(entry.file().fileName())
+ &&
!isVectorStoreFile(entry.file().fileName()))
.plan()
.files();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
index 102cd78e8b..96edb8d75a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
@@ -721,6 +721,61 @@ public class DataEvolutionMergeIntoActionITCase extends
ActionITCaseBase {
"Expected error about raw-data BLOB column but got: " +
t.getMessage());
}
+ @Test
+ public void testUpdateNonBlobColumnOnRawBlobTableWithSplitFiles() throws
Exception {
+ sEnv.executeSql(
+ buildDdl(
+ "RAW_BLOB_SPLIT_T",
+ Arrays.asList("id INT", "name STRING", "picture
BYTES"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<String, String>() {
+ {
+ put(ROW_TRACKING_ENABLED.key(), "true");
+ put(DATA_EVOLUTION_ENABLED.key(), "true");
+ put("blob-field", "picture");
+ put(CoreOptions.BLOB_TARGET_FILE_SIZE.key(),
"1 b");
+ }
+ }));
+ insertInto(
+ "RAW_BLOB_SPLIT_T",
+ "(1, 'name1', X'48656C6C6F')",
+ "(2, 'name2', X'5945')",
+ "(3, 'name3', X'414243')");
+ testBatchRead(
+ "SELECT COUNT(*) FROM `RAW_BLOB_SPLIT_T$files` "
+ + "WHERE file_path NOT LIKE '%.blob'",
+ Collections.singletonList(changelogRow("+I", 1L)));
+ testBatchRead(
+ "SELECT COUNT(*) > 1 FROM `RAW_BLOB_SPLIT_T$files` "
+ + "WHERE file_path LIKE '%.blob'",
+ Collections.singletonList(changelogRow("+I", true)));
+
+ sEnv.executeSql(
+ buildDdl(
+ "RAW_BLOB_SPLIT_S",
+ Arrays.asList("id INT", "name STRING"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyMap()));
+ insertInto("RAW_BLOB_SPLIT_S", "(1, 'updated_name1')");
+
+ builder(warehouse, database, "RAW_BLOB_SPLIT_T")
+ .withMergeCondition("RAW_BLOB_SPLIT_T.id=RAW_BLOB_SPLIT_S.id")
+
.withMatchedUpdateSet("RAW_BLOB_SPLIT_T.name=RAW_BLOB_SPLIT_S.name")
+ .withSourceTable("RAW_BLOB_SPLIT_S")
+ .withSinkParallelism(1)
+ .build()
+ .run();
+
+ List<Row> expected =
+ Arrays.asList(
+ changelogRow("+I", 1, "updated_name1"),
+ changelogRow("+I", 2, "name2"),
+ changelogRow("+I", 3, "name3"));
+ testBatchRead("SELECT id, name FROM RAW_BLOB_SPLIT_T ORDER BY id",
expected);
+ }
+
@Test
public void testUpdateNonBlobColumnOnDescriptorBlobTableSucceeds() throws
Exception {
// Create a table with descriptor BLOB column.
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
index e2122bd80a..2b78333223 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
@@ -20,12 +20,14 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions
import org.apache.paimon.data.BinaryRow
+import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile
import org.apache.paimon.spark.write.{DataEvolutionTableDataWrite,
WriteHelper, WriteTaskResult}
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink._
import org.apache.paimon.table.source.DataSplit
import org.apache.paimon.types.DataType
import org.apache.paimon.types.DataTypeRoot.BLOB
+import org.apache.paimon.types.VectorType.isVectorStoreFile
import org.apache.spark.sql._
@@ -44,6 +46,7 @@ case class DataEvolutionPaimonWriter(paimonTable:
FileStoreTable, dataSplits: Se
split
.dataFiles()
.asScala
+ .filter(file => !isBlobFile(file.fileName()) &&
!isVectorStoreFile(file.fileName()))
.foreach(
file =>
firstRowIdToPartitionMap
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index 492d64bbf5..6920b44015 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -32,6 +32,7 @@ import
org.apache.paimon.spark.util.ScanPlanHelper.createNewScanPlan
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
import org.apache.paimon.table.source.DataSplit
+import org.apache.paimon.types.VectorType.isVectorStoreFile
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.PaimonUtils._
@@ -150,7 +151,12 @@ case class MergeIntoPaimonDataEvolutionTable(
val firstRowIds: immutable.IndexedSeq[Long] = tableSplits
.flatMap(_.dataFiles().asScala)
- .filter(file => file.firstRowId() != null &&
!isBlobFile(file.fileName()))
+ .filter {
+ file =>
+ file.firstRowId() != null &&
+ !isBlobFile(file.fileName()) &&
+ !isVectorStoreFile(file.fileName())
+ }
.map(file => file.firstRowId().asInstanceOf[Long])
.distinct
.sorted
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index e95b63785d..2ff6eb308d 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -339,6 +339,61 @@ class BlobTestBase extends PaimonSparkTestBase {
}
}
+ test("Blob: merge-into updates non-blob column on raw blob table with split
blob files") {
+ withTable("s", "t") {
+ sql(
+ "CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES " +
+ "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', " +
+ "'blob-field'='picture', 'blob.target-file-size'='1 b')")
+ sql(
+ "INSERT INTO t VALUES " +
+ "(1, 'name1', X'48656C6C6F'), " +
+ "(2, 'name2', X'5945'), " +
+ "(3, 'name3', X'414243')")
+
+ sql("CREATE TABLE s (id INT, name STRING)")
+ sql("INSERT INTO s VALUES (1, 'updated_name1')")
+
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN MATCHED THEN UPDATE SET t.name = s.name
+ |""".stripMargin)
+
+ checkAnswer(
+ sql("SELECT id, name FROM t ORDER BY id"),
+ Seq(Row(1, "updated_name1"), Row(2, "name2"), Row(3, "name3"))
+ )
+ }
+ }
+
+ test("Blob: self merge reads raw blob column to update non-blob column") {
+ withTable("t") {
+ sql(
+ "CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES " +
+ "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', " +
+ "'blob-field'='picture', 'blob.target-file-size'='1 b')")
+ sql(
+ "INSERT INTO t VALUES " +
+ "(1, 'name1', X'48656C6C6F'), " +
+ "(2, 'name2', X'5945'), " +
+ "(3, 'name3', X'414243')")
+
+ sql("""
+ |MERGE INTO t
+ |USING t AS source
+ |ON t._ROW_ID = source._ROW_ID
+ |WHEN MATCHED THEN UPDATE SET t.name = CAST(length(source.picture)
AS STRING)
+ |""".stripMargin)
+
+ checkAnswer(
+ sql("SELECT id, name FROM t ORDER BY id"),
+ Seq(Row(1, "5"), Row(2, "2"), Row(3, "3"))
+ )
+ }
+ }
+
test("Blob: merge-into updates non-blob column on descriptor blob table") {
withTable("s", "t") {
sql(