This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cac917c10 [core] Refactor BlobFileContext for blob writing
4cac917c10 is described below

commit 4cac917c10bef864e5470fb0cdd2ce69bd7f7d84
Author: JingsongLi <[email protected]>
AuthorDate: Tue Mar 3 15:34:29 2026 +0800

    [core] Refactor BlobFileContext for blob writing
---
 .../java/org/apache/paimon/types/BlobType.java     |  50 ++++++----
 .../org/apache/paimon/append/AppendOnlyWriter.java |  34 ++-----
 .../paimon/append/MultipleBlobFileWriter.java      |   4 +-
 .../paimon/append/RollingBlobFileWriter.java       |  37 ++++----
 .../paimon/operation/BaseAppendFileStoreWrite.java |  31 +++---
 .../apache/paimon/operation/BlobFileContext.java   | 104 +++++++++++++++++++++
 .../org/apache/paimon/schema/SchemaValidation.java |  15 +--
 .../apache/paimon/append/AppendOnlyWriterTest.java |   8 +-
 .../paimon/append/RollingBlobFileWriterTest.java   |  35 ++-----
 .../paimon/io/KeyValueFileReadWriteTest.java       |   8 +-
 10 files changed, 196 insertions(+), 130 deletions(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java 
b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
index 4d2d42afda..14a603dd58 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
@@ -19,11 +19,11 @@
 package org.apache.paimon.types;
 
 import org.apache.paimon.annotation.Public;
-import org.apache.paimon.utils.Pair;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Data type of binary large object.
@@ -67,32 +67,42 @@ public final class BlobType extends DataType {
         return visitor.visit(this);
     }
 
-    public static Pair<RowType, RowType> splitBlob(RowType rowType) {
-        return splitBlob(rowType, java.util.Collections.emptySet());
+    /**
+     * Retrieve fields not stored in blob files.
+     *
+     * <p>Blob fields contained in {@code blobDescriptorField} are treated as 
normal fields (stored
+     * inline as serialized descriptor bytes), while other blob fields are 
treated as blob-file
+     * fields.
+     */
+    public static List<DataField> fieldsNotInBlobFile(
+            RowType rowType, Set<String> descriptorFields) {
+        Set<String> fieldsInBlobFile =
+                fieldsInBlobFile(rowType, descriptorFields).stream()
+                        .map(DataField::name)
+                        .collect(Collectors.toSet());
+        return rowType.getFields().stream()
+                .filter(field -> !fieldsInBlobFile.contains(field.name()))
+                .collect(Collectors.toList());
     }
 
     /**
-     * Split row fields into normal fields and blob-file fields.
+     * Retrieve fields stored in blob files.
      *
      * <p>Blob fields contained in {@code blobDescriptorField} are treated as 
normal fields (stored
      * inline as serialized descriptor bytes), while other blob fields are 
treated as blob-file
      * fields.
      */
-    public static Pair<RowType, RowType> splitBlob(
-            RowType rowType, Set<String> blobDescriptorFields) {
-        List<DataField> fields = rowType.getFields();
-        List<DataField> normalFields = new ArrayList<>();
-        List<DataField> blobFields = new ArrayList<>();
-
-        for (DataField field : fields) {
-            DataTypeRoot type = field.type().getTypeRoot();
-            if (type == DataTypeRoot.BLOB && 
!blobDescriptorFields.contains(field.name())) {
-                blobFields.add(field);
-            } else {
-                normalFields.add(field);
-            }
-        }
-
-        return Pair.of(new RowType(normalFields), new RowType(blobFields));
+    public static List<DataField> fieldsInBlobFile(RowType rowType, 
Set<String> descriptorFields) {
+        List<DataField> result = new ArrayList<>();
+        rowType.getFields()
+                .forEach(
+                        field -> {
+                            DataTypeRoot type = field.type().getTypeRoot();
+                            if (type == DataTypeRoot.BLOB
+                                    && 
!descriptorFields.contains(field.name())) {
+                                result.add(field);
+                            }
+                        });
+        return result;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 4c2ebebc39..cd3f4f9c70 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -22,7 +22,6 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.compact.CompactDeletionFile;
 import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.compression.CompressOptions;
-import org.apache.paimon.data.BlobConsumer;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.disk.RowBuffer;
@@ -39,9 +38,9 @@ import org.apache.paimon.io.RowDataRollingFileWriter;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.operation.BlobFileContext;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.reader.RecordReaderIterator;
-import org.apache.paimon.types.BlobType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BatchRecordWriter;
 import org.apache.paimon.utils.CommitIncrement;
@@ -60,7 +59,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
 
@@ -83,6 +81,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
     private final boolean forceCompact;
     private final boolean asyncFileWrite;
     private final boolean statsDenseStore;
+    @Nullable private final BlobFileContext blobContext;
     private final List<DataFileMeta> newFiles;
     private final List<DataFileMeta> deletedFiles;
     private final List<DataFileMeta> compactBefore;
@@ -94,10 +93,6 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
     @Nullable private final IOManager ioManager;
     private final FileIndexOptions fileIndexOptions;
     private final MemorySize maxDiskSize;
-    @Nullable private final BlobConsumer blobConsumer;
-    private final Set<String> blobDescriptorFields;
-    private final Set<String> blobExternalStorageFields;
-    @Nullable private final String blobExternalStoragePath;
 
     @Nullable private CompactDeletionFile compactDeletionFile;
     private SinkWriter<InternalRow> sinkWriter;
@@ -127,11 +122,8 @@ public class AppendOnlyWriter implements 
BatchRecordWriter, MemoryOwner {
             FileIndexOptions fileIndexOptions,
             boolean asyncFileWrite,
             boolean statsDenseStore,
-            @Nullable BlobConsumer blobConsumer,
-            Set<String> blobDescriptorFields,
-            Set<String> blobExternalStorageFields,
-            @Nullable String blobExternalStoragePath,
-            boolean dataEvolutionEnabled) {
+            boolean dataEvolutionEnabled,
+            @Nullable BlobFileContext blobContext) {
         this.fileIO = fileIO;
         this.schemaId = schemaId;
         this.fileFormat = fileFormat;
@@ -145,7 +137,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
         this.forceCompact = forceCompact;
         this.asyncFileWrite = asyncFileWrite;
         this.statsDenseStore = statsDenseStore;
-        this.blobConsumer = blobConsumer;
+        this.blobContext = blobContext;
         this.newFiles = new ArrayList<>();
         this.deletedFiles = new ArrayList<>();
         this.compactBefore = new ArrayList<>();
@@ -159,9 +151,6 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
         this.statsCollectorFactories = statsCollectorFactories;
         this.maxDiskSize = maxDiskSize;
         this.fileIndexOptions = fileIndexOptions;
-        this.blobDescriptorFields = blobDescriptorFields;
-        this.blobExternalStorageFields = blobExternalStorageFields;
-        this.blobExternalStoragePath = blobExternalStoragePath;
 
         this.sinkWriter =
                 useWriteBuffer
@@ -313,8 +302,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
     }
 
     private RollingFileWriter<InternalRow, DataFileMeta> 
createRollingRowWriter() {
-        if (BlobType.splitBlob(writeSchema, 
blobDescriptorFields).getRight().getFieldCount() > 0
-                || !blobExternalStorageFields.isEmpty()) {
+        if (blobContext != null) {
             return new RollingBlobFileWriter(
                     fileIO,
                     schemaId,
@@ -328,16 +316,8 @@ public class AppendOnlyWriter implements 
BatchRecordWriter, MemoryOwner {
                     statsCollectorFactories,
                     fileIndexOptions,
                     FileSource.APPEND,
-                    // blob write does not need async write, cause blob write 
is I/O-intensive, not
-                    // CPU-intensive process.
-                    // Async write will cost 64MB more memory per write task, 
blob writes get low
-                    // benefit from async write, but cost a lot.
-                    false,
                     statsDenseStore,
-                    blobConsumer,
-                    blobDescriptorFields,
-                    blobExternalStorageFields,
-                    blobExternalStoragePath);
+                    blobContext);
         }
         return new RowDataRollingFileWriter(
                 fileIO,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
index 92b0566d54..93075b2035 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
@@ -32,7 +32,6 @@ import org.apache.paimon.io.SingleFileWriter;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
 import org.apache.paimon.statistics.SimpleColStatsCollector;
-import org.apache.paimon.types.BlobType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.LongCounter;
 
@@ -46,6 +45,7 @@ import java.util.Set;
 import java.util.function.Supplier;
 
 import static java.util.Collections.singletonList;
+import static org.apache.paimon.types.BlobType.fieldsInBlobFile;
 
 /** A blob file writer that writes blob files. */
 public class MultipleBlobFileWriter implements Closeable {
@@ -64,7 +64,7 @@ public class MultipleBlobFileWriter implements Closeable {
             long targetFileSize,
             @Nullable BlobConsumer blobConsumer,
             Set<String> blobDescriptorFields) {
-        RowType blobRowType = BlobType.splitBlob(writeSchema, 
blobDescriptorFields).getRight();
+        RowType blobRowType = new RowType(fieldsInBlobFile(writeSchema, 
blobDescriptorFields));
         this.blobWriters = new ArrayList<>();
         for (String blobFieldName : blobRowType.getFieldNames()) {
             BlobFileFormat blobFileFormat = new BlobFileFormat();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index 2fc794e7e0..eb4ec0a0c1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.append;
 
-import org.apache.paimon.data.BlobConsumer;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
@@ -31,7 +30,8 @@ import org.apache.paimon.io.RollingFileWriter;
 import org.apache.paimon.io.RowDataFileWriter;
 import org.apache.paimon.io.SingleFileWriter;
 import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.types.BlobType;
+import org.apache.paimon.operation.BlobFileContext;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.LongCounter;
 import org.apache.paimon.utils.Preconditions;
@@ -48,9 +48,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.function.Supplier;
 
+import static org.apache.paimon.types.BlobType.fieldsNotInBlobFile;
+
 /**
  * A rolling file writer that handles both normal data and blob data. This 
writer creates separate
  * files for normal columns and blob columns, managing their lifecycle and 
ensuring consistency
@@ -84,7 +85,6 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
             writerFactory;
     private final Supplier<MultipleBlobFileWriter> blobWriterFactory;
     private final long targetFileSize;
-    private final Set<String> blobDescriptorFields;
     @Nullable private final ExternalStorageBlobWriter 
externalStorageBlobWriter;
 
     // State management
@@ -110,17 +110,18 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
             StatsCollectorFactories statsCollectorFactories,
             FileIndexOptions fileIndexOptions,
             FileSource fileSource,
-            boolean asyncFileWrite,
             boolean statsDenseStore,
-            @Nullable BlobConsumer blobConsumer,
-            Set<String> blobDescriptorFields,
-            Set<String> blobExternalStorageFields,
-            @Nullable String blobExternalStoragePath) {
+            BlobFileContext context) {
         // Initialize basic fields
         this.targetFileSize = targetFileSize;
         this.results = new ArrayList<>();
         this.closedWriters = new ArrayList<>();
-        this.blobDescriptorFields = blobDescriptorFields;
+
+        // blob write does not need async write, cause blob write is 
I/O-intensive, not
+        // CPU-intensive process.
+        // Async write will cost 64MB more memory per write task, blob writes 
get low
+        // benefit from async write, but cost a lot.
+        boolean asyncFileWrite = false;
 
         // Initialize writer factory for normal data
         this.writerFactory =
@@ -128,7 +129,7 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
                         fileIO,
                         schemaId,
                         fileFormat,
-                        BlobType.splitBlob(writeSchema, 
blobDescriptorFields).getLeft(),
+                        fieldsNotInBlobFile(writeSchema, 
context.blobDescriptorFields()),
                         writeSchema,
                         pathFactory,
                         seqNumCounterSupplier,
@@ -152,18 +153,18 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
                                 asyncFileWrite,
                                 statsDenseStore,
                                 blobTargetFileSize,
-                                blobConsumer,
-                                this.blobDescriptorFields);
+                                context.blobConsumer(),
+                                context.blobDescriptorFields());
 
         // Initialize writer for descriptor fields backed by external storage 
if needed.
-        if (!blobExternalStorageFields.isEmpty()) {
+        if (!context.blobExternalStorageFields().isEmpty()) {
             this.externalStorageBlobWriter =
                     new ExternalStorageBlobWriter(
                             fileIO,
                             schemaId,
                             writeSchema,
-                            blobExternalStorageFields,
-                            blobExternalStoragePath,
+                            context.blobExternalStorageFields(),
+                            context.blobExternalStoragePath(),
                             pathFactory,
                             seqNumCounterSupplier,
                             fileSource,
@@ -182,7 +183,7 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
                     FileIO fileIO,
                     long schemaId,
                     FileFormat fileFormat,
-                    RowType normalRowType,
+                    List<DataField> fieldsNotInBlobFile,
                     RowType writeSchema,
                     DataFilePathFactory pathFactory,
                     Supplier<LongCounter> seqNumCounterSupplier,
@@ -192,7 +193,7 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
                     FileSource fileSource,
                     boolean asyncFileWrite,
                     boolean statsDenseStore) {
-
+        RowType normalRowType = new RowType(fieldsNotInBlobFile);
         List<String> normalColumnNames = normalRowType.getFieldNames();
         int[] projectionNormalFields = 
writeSchema.projectIndexes(normalColumnNames);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index b88935bfa1..a714658f2c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -58,13 +58,11 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static org.apache.paimon.format.FileFormat.fileFormat;
-import static org.apache.paimon.types.DataTypeRoot.BLOB;
 import static 
org.apache.paimon.utils.StatsCollectorFactories.createStatsFactories;
 
 /** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */
@@ -81,14 +79,10 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
     private final FileIndexOptions fileIndexOptions;
     private final RowType rowType;
 
+    private @Nullable BlobFileContext blobContext;
     private RowType writeType;
     private @Nullable List<String> writeCols;
     private boolean forceBufferSpill = false;
-    private boolean withBlob;
-    private @Nullable BlobConsumer blobConsumer;
-    private final Set<String> blobDescriptorFields;
-    private final Set<String> blobExternalStorageFields;
-    @Nullable private final String blobExternalStoragePath;
 
     public BaseAppendFileStoreWrite(
             FileIO fileIO,
@@ -111,17 +105,15 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
         this.writeCols = null;
         this.fileFormat = fileFormat(options);
         this.pathFactory = pathFactory;
-        this.withBlob = rowType.getFieldTypes().stream().anyMatch(t -> 
t.is(BLOB));
-        this.blobDescriptorFields = options.blobDescriptorField();
-        this.blobExternalStorageFields = options.blobExternalStorageField();
-        this.blobExternalStoragePath = options.blobExternalStoragePath();
-
+        this.blobContext = BlobFileContext.create(rowType, options);
         this.fileIndexOptions = options.indexColumnsOptions();
     }
 
     @Override
     public BaseAppendFileStoreWrite withBlobConsumer(BlobConsumer 
blobConsumer) {
-        this.blobConsumer = blobConsumer;
+        if (blobContext != null) {
+            blobContext = blobContext.withBlobConsumer(blobConsumer);
+        }
         return this;
     }
 
@@ -159,17 +151,16 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
                 fileIndexOptions,
                 options.asyncFileWrite(),
                 options.statsDenseStore(),
-                blobConsumer,
-                blobDescriptorFields,
-                blobExternalStorageFields,
-                blobExternalStoragePath,
-                options.dataEvolutionEnabled());
+                options.dataEvolutionEnabled(),
+                blobContext);
     }
 
     @Override
     public void withWriteType(RowType writeType) {
         this.writeType = writeType;
-        this.withBlob = writeType.getFieldTypes().stream().anyMatch(t -> 
t.is(BLOB));
+        if (blobContext != null) {
+            blobContext = blobContext.withWriteType(writeType);
+        }
         int fullCount = rowType.getFieldCount();
         List<String> fullNames = rowType.getFieldNames();
         this.writeCols = writeType.getFieldNames();
@@ -302,7 +293,7 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
         if (ioManager == null) {
             return;
         }
-        if (withBlob) {
+        if (blobContext != null) {
             return;
         }
         if (forceBufferSpill) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java
new file mode 100644
index 0000000000..2069574579
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BlobConsumer;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.Set;
+
+import static org.apache.paimon.types.DataTypeRoot.BLOB;
+
+/** Context for blob file. */
+public class BlobFileContext {
+
+    private final Set<String> blobDescriptorFields;
+    private final Set<String> blobExternalStorageFields;
+    @Nullable private final String blobExternalStoragePath;
+
+    private @Nullable BlobConsumer blobConsumer;
+
+    private BlobFileContext(
+            Set<String> blobDescriptorFields,
+            Set<String> blobExternalStorageFields,
+            @Nullable String blobExternalStoragePath) {
+        this.blobDescriptorFields = blobDescriptorFields;
+        this.blobExternalStorageFields = blobExternalStorageFields;
+        this.blobExternalStoragePath = blobExternalStoragePath;
+    }
+
+    @Nullable
+    public static BlobFileContext create(RowType rowType, CoreOptions options) 
{
+        if (rowType.getFieldTypes().stream().noneMatch(t -> t.is(BLOB))) {
+            return null;
+        }
+        Set<String> descriptorFields = options.blobDescriptorField();
+        Set<String> externalStorageField = options.blobExternalStorageField();
+        String externalStoragePath = options.blobExternalStoragePath();
+        boolean requireBlobFile = false;
+        for (DataField field : rowType.getFields()) {
+            DataTypeRoot type = field.type().getTypeRoot();
+            if (type == DataTypeRoot.BLOB
+                    && (!descriptorFields.contains(field.name())
+                            || externalStorageField.contains(field.name()))) {
+                requireBlobFile = true;
+                break;
+            }
+        }
+        if (!requireBlobFile) {
+            return null;
+        }
+        return new BlobFileContext(descriptorFields, externalStorageField, 
externalStoragePath);
+    }
+
+    public BlobFileContext withBlobConsumer(BlobConsumer blobConsumer) {
+        this.blobConsumer = blobConsumer;
+        return this;
+    }
+
+    public BlobFileContext withWriteType(RowType writeType) {
+        if (writeType.getFieldTypes().stream().noneMatch(t -> t.is(BLOB))) {
+            return null;
+        }
+        return this;
+    }
+
+    public Set<String> blobDescriptorFields() {
+        return blobDescriptorFields;
+    }
+
+    public Set<String> blobExternalStorageFields() {
+        return blobExternalStorageFields;
+    }
+
+    @Nullable
+    public String blobExternalStoragePath() {
+        return blobExternalStoragePath;
+    }
+
+    @Nullable
+    public BlobConsumer blobConsumer() {
+        return blobConsumer;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 4aac1257f7..ebb5a0f9c5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -31,7 +31,6 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.BlobType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeRoot;
@@ -41,7 +40,6 @@ import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.MultisetType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimestampType;
-import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
@@ -79,6 +77,7 @@ import static 
org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
 import static 
org.apache.paimon.table.PrimaryKeyTableUtils.createMergeFunctionFactory;
 import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
 import static org.apache.paimon.table.SpecialFields.SYSTEM_FIELD_NAMES;
+import static org.apache.paimon.types.BlobType.fieldsNotInBlobFile;
 import static org.apache.paimon.types.DataTypeRoot.ARRAY;
 import static org.apache.paimon.types.DataTypeRoot.MAP;
 import static org.apache.paimon.types.DataTypeRoot.MULTISET;
@@ -163,7 +162,7 @@ public class SchemaValidation {
         Set<String> blobDescriptorFields = 
validateBlobDescriptorFields(tableRowType, options);
         validateBlobExternalStorageFields(tableRowType, options, 
blobDescriptorFields);
         fileFormat.validateDataFields(
-                BlobType.splitBlob(tableRowType, 
blobDescriptorFields).getLeft());
+                new RowType(fieldsNotInBlobFile(tableRowType, 
blobDescriptorFields)));
 
         // Check column names in schema
         schema.fieldNames()
@@ -611,14 +610,18 @@ public class SchemaValidation {
                     "Data evolution config must disabled with 
clustering.incremental");
         }
 
-        Pair<RowType, RowType> normalAndBlobType = 
BlobType.splitBlob(schema.logicalRowType());
-        List<String> blobNames = normalAndBlobType.getRight().getFieldNames();
+        List<DataField> fields = schema.fields();
+        List<String> blobNames =
+                fields.stream()
+                        .filter(field -> field.type().is(DataTypeRoot.BLOB))
+                        .map(DataField::name)
+                        .collect(Collectors.toList());
         if (!blobNames.isEmpty()) {
             checkArgument(
                     options.dataEvolutionEnabled(),
                     "Data evolution config must enabled for table with BLOB 
type column.");
             checkArgument(
-                    normalAndBlobType.getLeft().getFieldCount() > 0,
+                    fields.size() > blobNames.size(),
                     "Table with BLOB type column must have other normal 
columns.");
             checkArgument(
                     
blobNames.stream().noneMatch(schema.partitionKeys()::contains),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index ad0ca18f19..b2d820adf8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -41,6 +41,7 @@ import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.operation.BaseAppendFileStoreWrite;
+import org.apache.paimon.operation.BlobFileContext;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
@@ -712,11 +713,8 @@ public class AppendOnlyWriterTest {
                         new FileIndexOptions(),
                         true,
                         false,
-                        null,
-                        options.blobDescriptorField(),
-                        options.blobExternalStorageField(),
-                        options.blobExternalStoragePath(),
-                        options.dataEvolutionEnabled());
+                        options.dataEvolutionEnabled(),
+                        BlobFileContext.create(AppendOnlyWriterTest.SCHEMA, 
options));
         writer.setMemoryPool(
                 new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
         return Pair.of(writer, compactManager.allFiles());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
index 2b37f7dc72..11095cb7d0 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
@@ -31,6 +31,7 @@ import org.apache.paimon.io.BundleRecords;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.operation.BlobFileContext;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -43,7 +44,6 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
@@ -91,6 +91,7 @@ public class RollingBlobFileWriterTest {
         seqNumCounter = new LongCounter();
 
         // Initialize the writer
+        CoreOptions options = new CoreOptions(new Options());
         writer =
                 new RollingBlobFileWriter(
                         fileIO,
@@ -102,15 +103,11 @@ public class RollingBlobFileWriterTest {
                         pathFactory,
                         () -> seqNumCounter,
                         COMPRESSION,
-                        new StatsCollectorFactories(new CoreOptions(new 
Options())),
+                        new StatsCollectorFactories(options),
                         new FileIndexOptions(),
                         FileSource.APPEND,
-                        false, // asyncFileWrite
                         false, // statsDenseStore
-                        null,
-                        Collections.emptySet(),
-                        Collections.emptySet(),
-                        null);
+                        BlobFileContext.create(SCHEMA, options));
     }
 
     @Test
@@ -208,12 +205,8 @@ public class RollingBlobFileWriterTest {
                         new StatsCollectorFactories(new CoreOptions(new 
Options())),
                         new FileIndexOptions(),
                         FileSource.APPEND,
-                        false, // asyncFileWrite
                         false, // statsDenseStore
-                        null,
-                        Collections.emptySet(),
-                        Collections.emptySet(),
-                        null);
+                        BlobFileContext.create(SCHEMA, new CoreOptions(new 
Options())));
 
         // Create large blob data that will exceed the blob target file size
         byte[] largeBlobData = new byte[3 * 1024 * 1024]; // 3 MB blob data
@@ -288,12 +281,8 @@ public class RollingBlobFileWriterTest {
                         new StatsCollectorFactories(new CoreOptions(new 
Options())),
                         new FileIndexOptions(),
                         FileSource.APPEND,
-                        false, // asyncFileWrite
                         false, // statsDenseStore
-                        null,
-                        Collections.emptySet(),
-                        Collections.emptySet(),
-                        null);
+                        BlobFileContext.create(SCHEMA, new CoreOptions(new 
Options())));
 
         // Create blob data that will trigger rolling
         byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data
@@ -370,12 +359,8 @@ public class RollingBlobFileWriterTest {
                         new StatsCollectorFactories(new CoreOptions(new 
Options())),
                         new FileIndexOptions(),
                         FileSource.APPEND,
-                        false, // asyncFileWrite
                         false, // statsDenseStore
-                        null,
-                        Collections.emptySet(),
-                        Collections.emptySet(),
-                        null);
+                        BlobFileContext.create(SCHEMA, new CoreOptions(new 
Options())));
 
         // Create blob data that will trigger rolling
         byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data
@@ -590,12 +575,8 @@ public class RollingBlobFileWriterTest {
                         new StatsCollectorFactories(new CoreOptions(new 
Options())),
                         new FileIndexOptions(),
                         FileSource.APPEND,
-                        false, // asyncFileWrite
                         false, // statsDenseStore
-                        null,
-                        Collections.emptySet(),
-                        Collections.emptySet(),
-                        null);
+                        BlobFileContext.create(SCHEMA, new CoreOptions(new 
Options())));
 
         // Write data
         for (int i = 0; i < 3; i++) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 5dc5635bea..e350cb2a9a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -42,6 +42,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.operation.BlobFileContext;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReaderIterator;
@@ -291,11 +292,8 @@ public class KeyValueFileReadWriteTest {
                         new FileIndexOptions(),
                         true,
                         false,
-                        null,
-                        options.blobDescriptorField(),
-                        options.blobExternalStorageField(),
-                        options.blobExternalStoragePath(),
-                        options.dataEvolutionEnabled());
+                        options.dataEvolutionEnabled(),
+                        BlobFileContext.create(schema, options));
         appendOnlyWriter.setMemoryPool(
                 new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
         appendOnlyWriter.write(


Reply via email to