This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ae5b7be9e5 [core] Fix: blob meta should contains a filter to match
normal data file meta (#6412)
ae5b7be9e5 is described below
commit ae5b7be9e54edee48d024e6a41a36c3dd01a8a24
Author: YeJunHao <[email protected]>
AuthorDate: Thu Oct 16 19:44:20 2025 +0800
[core] Fix: blob meta should contains a filter to match normal data file
meta (#6412)
---
.../table/source/DataEvolutionSplitGenerator.java | 24 ++++++++++++++++++++++
.../org/apache/paimon/spark/sql/BlobTestBase.scala | 12 +++++++----
2 files changed, 32 insertions(+), 4 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
index 8b758166b5..4edadb65ef 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
@@ -97,6 +97,8 @@ public class DataEvolutionSplitGenerator implements
SplitGenerator {
f2.maxSequenceNumber(),
f1.maxSequenceNumber());
}));
+ files = filterBlob(files);
+
// Split files by firstRowId
long lastRowId = -1;
long checkRowIdStart = 0;
@@ -128,4 +130,26 @@ public class DataEvolutionSplitGenerator implements
SplitGenerator {
return splitByRowId;
}
+
+ private static List<DataFileMeta> filterBlob(List<DataFileMeta> files) {
+ List<DataFileMeta> result = new ArrayList<>();
+ long rowIdStart = -1;
+ long rowIdEnd = -1;
+ for (DataFileMeta file : files) {
+ if (file.firstRowId() == null) {
+ result.add(file);
+ continue;
+ }
+ if (!isBlobFile(file.fileName())) {
+ rowIdStart = file.firstRowId();
+ rowIdEnd = file.firstRowId() + file.rowCount();
+ result.add(file);
+ } else {
+ if (file.firstRowId() >= rowIdStart && file.firstRowId() <
rowIdEnd) {
+ result.add(file);
+ }
+ }
+ }
+ return result;
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index 4d18837a72..9f7b3a8b5c 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -65,10 +65,14 @@ class BlobTestBase extends PaimonSparkTestBase {
sql(
"CREATE TABLE t (id INT, data STRING, picture BINARY) TBLPROPERTIES
('row-tracking.enabled'='true', 'data-evolution.enabled'='true',
'blob-field'='picture', 'blob-as-descriptor'='true')")
- sql("INSERT INTO t VALUES (1, 'paimon', X'" +
bytesToHex(blobDescriptor.serialize()) + "')")
-
+ sql(
+ "INSERT INTO t VALUES (1, 'paimon', X'" +
bytesToHex(blobDescriptor.serialize()) + "'),"
+ + "(5, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "'),"
+ + "(2, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "'),"
+ + "(3, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "'),"
+ + "(4, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "')")
val newDescriptorBytes =
- sql("SELECT picture FROM
t").collect()(0).get(0).asInstanceOf[Array[Byte]]
+ sql("SELECT picture FROM t WHERE id =
1").collect()(0).get(0).asInstanceOf[Array[Byte]]
val newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes)
val options = new Options()
options.set("warehouse", tempDBDir.toString)
@@ -79,7 +83,7 @@ class BlobTestBase extends PaimonSparkTestBase {
sql("ALTER TABLE t SET TBLPROPERTIES ('blob-as-descriptor'='false')")
checkAnswer(
- sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"),
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t WHERE id = 1"),
Seq(Row(1, "paimon", blobData, 0, 1))
)
}