This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e1dcc85ed2 [core] Refactor FieldBunch in DataEvolutionSplitRead
e1dcc85ed2 is described below

commit e1dcc85ed2a81a37fde6904a6ae8da3a611e2aa8
Author: JingsongLi <[email protected]>
AuthorDate: Mon Sep 29 16:38:28 2025 +0800

    [core] Refactor FieldBunch in DataEvolutionSplitRead
---
 .../java/org/apache/paimon/io/DataFileMeta.java    |   5 -
 .../paimon/operation/DataEvolutionSplitRead.java   | 156 ++++++++-------------
 .../paimon/operation/FileStoreCommitImpl.java      |   3 +-
 .../table/source/DataEvolutionSplitGenerator.java  |   5 +-
 .../org/apache/paimon/append/BlobTableTest.java    |  12 +-
 .../paimon/operation/DataEvolutionReadTest.java    |  37 ++---
 .../apache/paimon/format/blob/BlobFileFormat.java  |   4 +
 7 files changed, 89 insertions(+), 133 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 7f266d4554..e7bb8b9571 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -21,7 +21,6 @@ package org.apache.paimon.io;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.format.blob.BlobFileFormatFactory;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.stats.SimpleStats;
@@ -257,10 +256,6 @@ public interface DataFileMeta {
 
     String fileName();
 
-    default boolean isBlobFile() {
-        return fileName().endsWith("." + BlobFileFormatFactory.IDENTIFIER);
-    }
-
     long fileSize();
 
     long rowCount();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index ead9fa18e6..f2f15dcbbd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -44,7 +44,6 @@ import 
org.apache.paimon.table.source.DataEvolutionSplitGenerator;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Either;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.FormatReaderMapping;
 import org.apache.paimon.utils.FormatReaderMapping.Builder;
@@ -62,6 +61,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.lang.String.format;
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
 import static org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -172,11 +172,12 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
             Builder formatBuilder)
             throws IOException {
         List<FieldBunch> fieldsFiles =
-                splitFieldBunch(
+                splitFieldBunches(
                         needMergeFiles,
                         file -> {
                             checkArgument(
-                                    file.isBlobFile(), "Only blob file need to 
call this method.");
+                                    isBlobFile(file.fileName()),
+                                    "Only blob file need to call this 
method.");
                             return schemaFetcher
                                     .apply(file.schemaId())
                                     .logicalRowType()
@@ -185,14 +186,14 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                         });
 
         long rowCount = fieldsFiles.get(0).rowCount();
-        long firstRowId = fieldsFiles.get(0).firstRowId();
+        long firstRowId = fieldsFiles.get(0).files().get(0).firstRowId();
 
-        for (FieldBunch files : fieldsFiles) {
+        for (FieldBunch bunch : fieldsFiles) {
             checkArgument(
-                    files.rowCount() == rowCount,
+                    bunch.rowCount() == rowCount,
                     "All files in a field merge split should have the same row 
count.");
             checkArgument(
-                    files.firstRowId() == firstRowId,
+                    bunch.files().get(0).firstRowId() == firstRowId,
                     "All files in a field merge split should have the same 
first row id and could not be null.");
         }
 
@@ -208,10 +209,11 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
         Arrays.fill(fieldOffsets, -1);
 
         for (int i = 0; i < fieldsFiles.size(); i++) {
-            FieldBunch file = fieldsFiles.get(i);
-            String formatIdentifier = file.formatIdentifier();
-            long schemaId = file.schemaId();
-            TableSchema dataSchema = 
schemaFetcher.apply(schemaId).project(file.writeCols());
+            FieldBunch bunch = fieldsFiles.get(i);
+            DataFileMeta firstFile = bunch.files().get(0);
+            String formatIdentifier = 
DataFilePathFactory.formatIdentifier(firstFile.fileName());
+            long schemaId = firstFile.schemaId();
+            TableSchema dataSchema = 
schemaFetcher.apply(schemaId).project(firstFile.writeCols());
             int[] fieldIds =
                     
SpecialFields.rowTypeWithRowTracking(dataSchema.logicalRowType()).getFields()
                             .stream()
@@ -245,7 +247,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                         
readFields.stream().map(DataField::name).collect(Collectors.toList());
                 FormatReaderMapping formatReaderMapping =
                         formatReaderMappings.computeIfAbsent(
-                                new FormatKey(file.schemaId(), 
formatIdentifier, readFieldNames),
+                                new FormatKey(schemaId, formatIdentifier, 
readFieldNames),
                                 key ->
                                         formatBuilder.build(
                                                 formatIdentifier,
@@ -256,7 +258,10 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                 fileRecordReaders[i] =
                         new ForceSingleBatchReader(
                                 createFileReader(
-                                        partition, file, dataFilePathFactory, 
formatReaderMapping));
+                                        partition,
+                                        bunch,
+                                        dataFilePathFactory,
+                                        formatReaderMapping));
             }
         }
 
@@ -296,16 +301,16 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
 
     private RecordReader<InternalRow> createFileReader(
             BinaryRow partition,
-            FieldBunch files,
+            FieldBunch bunch,
             DataFilePathFactory dataFilePathFactory,
             FormatReaderMapping formatReaderMapping)
             throws IOException {
-        if (files.size() == 1) {
+        if (bunch.files().size() == 1) {
             return createFileReader(
-                    partition, files.getFirstFile(), dataFilePathFactory, 
formatReaderMapping);
+                    partition, bunch.files().get(0), dataFilePathFactory, 
formatReaderMapping);
         }
         List<ReaderSupplier<InternalRow>> readerSuppliers = new ArrayList<>();
-        for (DataFileMeta file : files.files()) {
+        for (DataFileMeta file : bunch.files()) {
             FormatReaderContext formatReaderContext =
                     new FormatReaderContext(
                             fileIO, dataFilePathFactory.toPath(file), 
file.fileSize(), null);
@@ -350,13 +355,13 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
     }
 
     @VisibleForTesting
-    public static List<FieldBunch> splitFieldBunch(
+    public static List<FieldBunch> splitFieldBunches(
             List<DataFileMeta> needMergeFiles, Function<DataFileMeta, Integer> 
blobFileToFieldId) {
         List<FieldBunch> fieldsFiles = new ArrayList<>();
         Map<Integer, BlobBunch> blobBunchMap = new HashMap<>();
         long rowCount = -1;
         for (DataFileMeta file : needMergeFiles) {
-            if (file.isBlobFile()) {
+            if (isBlobFile(file.fileName())) {
                 int fieldId = blobFileToFieldId.apply(file);
                 final long expectedRowCount = rowCount;
                 blobBunchMap
@@ -364,83 +369,50 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                         .add(file);
             } else {
                 // Normal file, just add it to the current merge split
-                fieldsFiles.add(FieldBunch.file(file));
+                fieldsFiles.add(new DataBunch(file));
                 rowCount = file.rowCount();
             }
         }
-        blobBunchMap.values().forEach(blobBunch -> 
fieldsFiles.add(FieldBunch.blob(blobBunch)));
+        fieldsFiles.addAll(blobBunchMap.values());
         return fieldsFiles;
     }
 
-    /** Files for one field. */
-    public static class FieldBunch {
-        final Either<DataFileMeta, BlobBunch> fileOrBlob;
-
-        FieldBunch(Either<DataFileMeta, BlobBunch> fileOrBlob) {
-            this.fileOrBlob = fileOrBlob;
-        }
-
-        static FieldBunch file(DataFileMeta file) {
-            return new FieldBunch(Either.left(file));
-        }
-
-        static FieldBunch blob(BlobBunch blob) {
-            return new FieldBunch(Either.right(blob));
-        }
+    /** Files for partial field. */
+    public interface FieldBunch {
 
-        long rowCount() {
-            return fileOrBlob.isLeft()
-                    ? fileOrBlob.getLeft().rowCount()
-                    : fileOrBlob.getRight().rowCount();
-        }
+        long rowCount();
 
-        long firstRowId() {
-            return fileOrBlob.isLeft()
-                    ? fileOrBlob.getLeft().firstRowId()
-                    : fileOrBlob.getRight().firstRowId();
-        }
+        List<DataFileMeta> files();
+    }
 
-        List<String> writeCols() {
-            return fileOrBlob.isLeft()
-                    ? fileOrBlob.getLeft().writeCols()
-                    : fileOrBlob.getRight().writeCols();
-        }
+    private static class DataBunch implements FieldBunch {
 
-        String formatIdentifier() {
-            return fileOrBlob.isLeft()
-                    ? 
DataFilePathFactory.formatIdentifier(fileOrBlob.getLeft().fileName())
-                    : "blob";
-        }
+        private final DataFileMeta dataFile;
 
-        long schemaId() {
-            return fileOrBlob.isLeft()
-                    ? fileOrBlob.getLeft().schemaId()
-                    : fileOrBlob.getRight().schemaId();
+        private DataBunch(DataFileMeta dataFile) {
+            this.dataFile = dataFile;
         }
 
-        @VisibleForTesting
-        public int size() {
-            return fileOrBlob.isLeft() ? 1 : 
fileOrBlob.getRight().files.size();
+        @Override
+        public long rowCount() {
+            return dataFile.rowCount();
         }
 
-        DataFileMeta getFirstFile() {
-            return fileOrBlob.isLeft() ? fileOrBlob.getLeft() : 
fileOrBlob.getRight().files.get(0);
-        }
-
-        List<DataFileMeta> files() {
-            return fileOrBlob.isLeft()
-                    ? Collections.singletonList(fileOrBlob.getLeft())
-                    : fileOrBlob.getRight().files;
+        @Override
+        public List<DataFileMeta> files() {
+            return Collections.singletonList(dataFile);
         }
     }
 
     @VisibleForTesting
-    static class BlobBunch {
+    static class BlobBunch implements FieldBunch {
+
+        final List<DataFileMeta> files;
         final long expectedRowCount;
-        List<DataFileMeta> files;
+
         long latestFistRowId = -1;
         long expectedNextFirstRowId = -1;
-        long lastestMaxSequenceNumber = -1;
+        long latestMaxSequenceNumber = -1;
         long rowCount;
 
         BlobBunch(long expectedRowCount) {
@@ -450,12 +422,12 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
         }
 
         void add(DataFileMeta file) {
-            if (!file.isBlobFile()) {
+            if (!isBlobFile(file.fileName())) {
                 throw new IllegalArgumentException("Only blob file can be 
added to a blob bunch.");
             }
 
             if (file.firstRowId() == latestFistRowId) {
-                if (file.maxSequenceNumber() >= lastestMaxSequenceNumber) {
+                if (file.maxSequenceNumber() >= latestMaxSequenceNumber) {
                     throw new IllegalArgumentException(
                             "Blob file with same first row id should have 
decreasing sequence number.");
                 }
@@ -465,7 +437,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                 long firstRowId = file.firstRowId();
                 if (firstRowId < expectedNextFirstRowId) {
                     checkArgument(
-                            file.maxSequenceNumber() < 
lastestMaxSequenceNumber,
+                            file.maxSequenceNumber() < latestMaxSequenceNumber,
                             "Blob file with overlapping row id should have 
decreasing sequence number.");
                     return;
                 } else if (firstRowId > expectedNextFirstRowId) {
@@ -487,37 +459,19 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
             checkArgument(
                     rowCount <= expectedRowCount,
                     "Blob files row count exceed the expect " + 
expectedRowCount);
-            this.lastestMaxSequenceNumber = file.maxSequenceNumber();
+            this.latestMaxSequenceNumber = file.maxSequenceNumber();
             this.latestFistRowId = file.firstRowId();
             this.expectedNextFirstRowId = latestFistRowId + file.rowCount();
         }
 
-        long rowCount() {
+        @Override
+        public long rowCount() {
             return rowCount;
         }
 
-        long firstRowId() {
-            if (files.isEmpty()) {
-                return -1;
-            } else {
-                return files.get(0).firstRowId();
-            }
-        }
-
-        List<String> writeCols() {
-            if (files.isEmpty()) {
-                return new ArrayList<>();
-            } else {
-                return files.get(0).writeCols();
-            }
-        }
-
-        long schemaId() {
-            if (files.isEmpty()) {
-                return -1;
-            } else {
-                return files.get(0).schemaId();
-            }
+        @Override
+        public List<DataFileMeta> files() {
+            return files;
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index e311aa0b2e..f41de87aa9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -88,6 +88,7 @@ import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyList;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
 import static org.apache.paimon.manifest.ManifestEntry.recordCount;
 import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
 import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
@@ -1217,7 +1218,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     "This is a bug, file source field for row-tracking table 
must present.");
             if (entry.file().fileSource().get().equals(FileSource.APPEND)
                     && entry.file().firstRowId() == null) {
-                if (entry.file().isBlobFile()) {
+                if (isBlobFile(entry.file().fileName())) {
                     if (blobStart >= start) {
                         throw new IllegalStateException(
                                 String.format(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
index 18ae25592a..8b758166b5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
@@ -30,6 +30,7 @@ import java.util.function.Function;
 import java.util.function.ToLongFunction;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Append data evolution table split generator, which implementation of 
{@link SplitGenerator}. */
@@ -86,7 +87,7 @@ public class DataEvolutionSplitGenerator implements 
SplitGenerator {
                                                 value.firstRowId() == null
                                                         ? Long.MIN_VALUE
                                                         : value.firstRowId())
-                        .thenComparingInt(f -> f.isBlobFile() ? 1 : 0)
+                        .thenComparingInt(f -> isBlobFile(f.fileName()) ? 1 : 
0)
                         .thenComparing(
                                 (f1, f2) -> {
                                     // If firstRowId is the same, we should 
read the file with
@@ -106,7 +107,7 @@ public class DataEvolutionSplitGenerator implements 
SplitGenerator {
                 splitByRowId.add(Collections.singletonList(file));
                 continue;
             }
-            if (!file.isBlobFile() && firstRowId != lastRowId) {
+            if (!isBlobFile(file.fileName()) && firstRowId != lastRowId) {
                 if (!currentSplit.isEmpty()) {
                     splitByRowId.add(currentSplit);
                 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 31e19d4f5c..c6567577bf 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -57,11 +57,11 @@ public class BlobTableTest extends TableTestBase {
                         .collect(Collectors.toList());
 
         List<DataEvolutionSplitRead.FieldBunch> fieldGroups =
-                DataEvolutionSplitRead.splitFieldBunch(filesMetas, key -> 0);
+                DataEvolutionSplitRead.splitFieldBunches(filesMetas, key -> 0);
 
         assertThat(fieldGroups.size()).isEqualTo(2);
-        assertThat(fieldGroups.get(0).size()).isEqualTo(1);
-        assertThat(fieldGroups.get(1).size()).isEqualTo(8);
+        assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
+        assertThat(fieldGroups.get(1).files().size()).isEqualTo(8);
 
         readDefault(
                 row -> {
@@ -91,10 +91,10 @@ public class BlobTableTest extends TableTestBase {
         assertThat(batches.size()).isEqualTo(2);
         for (List<DataFileMeta> batch : batches) {
             List<DataEvolutionSplitRead.FieldBunch> fieldGroups =
-                    DataEvolutionSplitRead.splitFieldBunch(batch, file -> 0);
+                    DataEvolutionSplitRead.splitFieldBunches(batch, file -> 0);
             assertThat(fieldGroups.size()).isEqualTo(2);
-            assertThat(fieldGroups.get(0).size()).isEqualTo(1);
-            assertThat(fieldGroups.get(1).size()).isEqualTo(8);
+            assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
+            assertThat(fieldGroups.get(1).files().size()).isEqualTo(8);
         }
 
         readDefault(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
index ce38d6e4b5..87fcb732ed 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
@@ -21,6 +21,8 @@ package org.apache.paimon.operation;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.operation.DataEvolutionSplitRead.BlobBunch;
+import org.apache.paimon.operation.DataEvolutionSplitRead.FieldBunch;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
 
@@ -32,17 +34,18 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.paimon.operation.DataEvolutionSplitRead.splitFieldBunches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Tests for {@link DataEvolutionSplitRead.BlobBunch}. */
+/** Tests for {@link BlobBunch}. */
 public class DataEvolutionReadTest {
 
-    private DataEvolutionSplitRead.BlobBunch blobBunch;
+    private BlobBunch blobBunch;
 
     @BeforeEach
     public void setUp() {
-        blobBunch = new DataEvolutionSplitRead.BlobBunch(Long.MAX_VALUE);
+        blobBunch = new BlobBunch(Long.MAX_VALUE);
     }
 
     @Test
@@ -54,8 +57,8 @@ public class DataEvolutionReadTest {
         assertThat(blobBunch.files).hasSize(1);
         assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry);
         assertThat(blobBunch.rowCount()).isEqualTo(100);
-        assertThat(blobBunch.firstRowId()).isEqualTo(0);
-        assertThat(blobBunch.writeCols()).isEqualTo(Arrays.asList("blob_col"));
+        assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
+        
assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
     }
 
     @Test
@@ -70,9 +73,9 @@ public class DataEvolutionReadTest {
         assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry);
         assertThat(blobBunch.files.get(1)).isEqualTo(blobTail);
         assertThat(blobBunch.rowCount()).isEqualTo(300);
-        assertThat(blobBunch.firstRowId()).isEqualTo(0);
-        assertThat(blobBunch.writeCols()).isEqualTo(Arrays.asList("blob_col"));
-        assertThat(blobBunch.schemaId()).isEqualTo(0L);
+        assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
+        
assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
+        assertThat(blobBunch.files.get(0).schemaId()).isEqualTo(0L);
     }
 
     @Test
@@ -176,8 +179,8 @@ public class DataEvolutionReadTest {
 
         assertThat(blobBunch.files).hasSize(4);
         assertThat(blobBunch.rowCount()).isEqualTo(1000);
-        assertThat(blobBunch.firstRowId()).isEqualTo(0);
-        assertThat(blobBunch.writeCols()).isEqualTo(Arrays.asList("blob_col"));
+        assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
+        
assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
     }
 
     @Test
@@ -211,11 +214,10 @@ public class DataEvolutionReadTest {
         assertThat(batch.get(8).fileName()).contains("blob4"); // skip
         assertThat(batch.get(9).fileName()).contains("blob8"); // pick
 
-        List<DataEvolutionSplitRead.FieldBunch> fieldBunches =
-                DataEvolutionSplitRead.splitFieldBunch(batch, file -> 0);
+        List<FieldBunch> fieldBunches = splitFieldBunches(batch, file -> 0);
         assertThat(fieldBunches.size()).isEqualTo(2);
 
-        DataEvolutionSplitRead.BlobBunch blobBunch = 
fieldBunches.get(1).fileOrBlob.getRight();
+        BlobBunch blobBunch = (BlobBunch) fieldBunches.get(1);
         assertThat(blobBunch.files).hasSize(4);
         assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
         assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
@@ -262,19 +264,18 @@ public class DataEvolutionReadTest {
 
         List<DataFileMeta> batch = batches.get(0);
 
-        List<DataEvolutionSplitRead.FieldBunch> fieldBunches =
-                DataEvolutionSplitRead.splitFieldBunch(
-                        batch, file -> file.writeCols().get(0).hashCode());
+        List<FieldBunch> fieldBunches =
+                splitFieldBunches(batch, file -> 
file.writeCols().get(0).hashCode());
         assertThat(fieldBunches.size()).isEqualTo(3);
 
-        DataEvolutionSplitRead.BlobBunch blobBunch = 
fieldBunches.get(1).fileOrBlob.getRight();
+        BlobBunch blobBunch = (BlobBunch) fieldBunches.get(1);
         assertThat(blobBunch.files).hasSize(4);
         assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
         assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
         assertThat(blobBunch.files.get(2).fileName()).contains("blob7");
         assertThat(blobBunch.files.get(3).fileName()).contains("blob8");
 
-        blobBunch = fieldBunches.get(2).fileOrBlob.getRight();
+        blobBunch = (BlobBunch) fieldBunches.get(2);
         assertThat(blobBunch.files).hasSize(4);
         assertThat(blobBunch.files.get(0).fileName()).contains("blob15");
         assertThat(blobBunch.files.get(1).fileName()).contains("blob19");
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
index f6b5345ebf..29f5df13df 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
@@ -47,6 +47,10 @@ public class BlobFileFormat extends FileFormat {
         super(BlobFileFormatFactory.IDENTIFIER);
     }
 
+    public static boolean isBlobFile(String fileName) {
+        return fileName.endsWith("." + BlobFileFormatFactory.IDENTIFIER);
+    }
+
     @Override
     public FormatReaderFactory createReaderFactory(
             RowType dataSchemaRowType,

Reply via email to