This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4cac917c10 [core] Refactor BlobFileContext for blob writing
4cac917c10 is described below
commit 4cac917c10bef864e5470fb0cdd2ce69bd7f7d84
Author: JingsongLi <[email protected]>
AuthorDate: Tue Mar 3 15:34:29 2026 +0800
[core] Refactor BlobFileContext for blob writing
---
.../java/org/apache/paimon/types/BlobType.java | 50 ++++++----
.../org/apache/paimon/append/AppendOnlyWriter.java | 34 ++-----
.../paimon/append/MultipleBlobFileWriter.java | 4 +-
.../paimon/append/RollingBlobFileWriter.java | 37 ++++----
.../paimon/operation/BaseAppendFileStoreWrite.java | 31 +++---
.../apache/paimon/operation/BlobFileContext.java | 104 +++++++++++++++++++++
.../org/apache/paimon/schema/SchemaValidation.java | 15 +--
.../apache/paimon/append/AppendOnlyWriterTest.java | 8 +-
.../paimon/append/RollingBlobFileWriterTest.java | 35 ++-----
.../paimon/io/KeyValueFileReadWriteTest.java | 8 +-
10 files changed, 196 insertions(+), 130 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
index 4d2d42afda..14a603dd58 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
@@ -19,11 +19,11 @@
package org.apache.paimon.types;
import org.apache.paimon.annotation.Public;
-import org.apache.paimon.utils.Pair;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* Data type of binary large object.
@@ -67,32 +67,42 @@ public final class BlobType extends DataType {
return visitor.visit(this);
}
- public static Pair<RowType, RowType> splitBlob(RowType rowType) {
- return splitBlob(rowType, java.util.Collections.emptySet());
+ /**
+ * Retrieve fields not stored in blob files.
+ *
+ * <p>Blob fields contained in {@code blobDescriptorField} are treated as
normal fields (stored
+ * inline as serialized descriptor bytes), while other blob fields are
treated as blob-file
+ * fields.
+ */
+ public static List<DataField> fieldsNotInBlobFile(
+ RowType rowType, Set<String> descriptorFields) {
+ Set<String> fieldsInBlobFile =
+ fieldsInBlobFile(rowType, descriptorFields).stream()
+ .map(DataField::name)
+ .collect(Collectors.toSet());
+ return rowType.getFields().stream()
+ .filter(field -> !fieldsInBlobFile.contains(field.name()))
+ .collect(Collectors.toList());
}
/**
- * Split row fields into normal fields and blob-file fields.
+ * Retrieve fields stored in blob files.
*
* <p>Blob fields contained in {@code blobDescriptorField} are treated as
normal fields (stored
* inline as serialized descriptor bytes), while other blob fields are
treated as blob-file
* fields.
*/
- public static Pair<RowType, RowType> splitBlob(
- RowType rowType, Set<String> blobDescriptorFields) {
- List<DataField> fields = rowType.getFields();
- List<DataField> normalFields = new ArrayList<>();
- List<DataField> blobFields = new ArrayList<>();
-
- for (DataField field : fields) {
- DataTypeRoot type = field.type().getTypeRoot();
- if (type == DataTypeRoot.BLOB &&
!blobDescriptorFields.contains(field.name())) {
- blobFields.add(field);
- } else {
- normalFields.add(field);
- }
- }
-
- return Pair.of(new RowType(normalFields), new RowType(blobFields));
+ public static List<DataField> fieldsInBlobFile(RowType rowType,
Set<String> descriptorFields) {
+ List<DataField> result = new ArrayList<>();
+ rowType.getFields()
+ .forEach(
+ field -> {
+ DataTypeRoot type = field.type().getTypeRoot();
+ if (type == DataTypeRoot.BLOB
+ &&
!descriptorFields.contains(field.name())) {
+ result.add(field);
+ }
+ });
+ return result;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 4c2ebebc39..cd3f4f9c70 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -22,7 +22,6 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactDeletionFile;
import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.compression.CompressOptions;
-import org.apache.paimon.data.BlobConsumer;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.RowBuffer;
@@ -39,9 +38,9 @@ import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.operation.BlobFileContext;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReaderIterator;
-import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BatchRecordWriter;
import org.apache.paimon.utils.CommitIncrement;
@@ -60,7 +59,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
@@ -83,6 +81,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
private final boolean forceCompact;
private final boolean asyncFileWrite;
private final boolean statsDenseStore;
+ @Nullable private final BlobFileContext blobContext;
private final List<DataFileMeta> newFiles;
private final List<DataFileMeta> deletedFiles;
private final List<DataFileMeta> compactBefore;
@@ -94,10 +93,6 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
@Nullable private final IOManager ioManager;
private final FileIndexOptions fileIndexOptions;
private final MemorySize maxDiskSize;
- @Nullable private final BlobConsumer blobConsumer;
- private final Set<String> blobDescriptorFields;
- private final Set<String> blobExternalStorageFields;
- @Nullable private final String blobExternalStoragePath;
@Nullable private CompactDeletionFile compactDeletionFile;
private SinkWriter<InternalRow> sinkWriter;
@@ -127,11 +122,8 @@ public class AppendOnlyWriter implements
BatchRecordWriter, MemoryOwner {
FileIndexOptions fileIndexOptions,
boolean asyncFileWrite,
boolean statsDenseStore,
- @Nullable BlobConsumer blobConsumer,
- Set<String> blobDescriptorFields,
- Set<String> blobExternalStorageFields,
- @Nullable String blobExternalStoragePath,
- boolean dataEvolutionEnabled) {
+ boolean dataEvolutionEnabled,
+ @Nullable BlobFileContext blobContext) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
@@ -145,7 +137,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
this.forceCompact = forceCompact;
this.asyncFileWrite = asyncFileWrite;
this.statsDenseStore = statsDenseStore;
- this.blobConsumer = blobConsumer;
+ this.blobContext = blobContext;
this.newFiles = new ArrayList<>();
this.deletedFiles = new ArrayList<>();
this.compactBefore = new ArrayList<>();
@@ -159,9 +151,6 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
this.statsCollectorFactories = statsCollectorFactories;
this.maxDiskSize = maxDiskSize;
this.fileIndexOptions = fileIndexOptions;
- this.blobDescriptorFields = blobDescriptorFields;
- this.blobExternalStorageFields = blobExternalStorageFields;
- this.blobExternalStoragePath = blobExternalStoragePath;
this.sinkWriter =
useWriteBuffer
@@ -313,8 +302,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
}
private RollingFileWriter<InternalRow, DataFileMeta>
createRollingRowWriter() {
- if (BlobType.splitBlob(writeSchema,
blobDescriptorFields).getRight().getFieldCount() > 0
- || !blobExternalStorageFields.isEmpty()) {
+ if (blobContext != null) {
return new RollingBlobFileWriter(
fileIO,
schemaId,
@@ -328,16 +316,8 @@ public class AppendOnlyWriter implements
BatchRecordWriter, MemoryOwner {
statsCollectorFactories,
fileIndexOptions,
FileSource.APPEND,
- // blob write does not need async write, cause blob write
is I/O-intensive, not
- // CPU-intensive process.
- // Async write will cost 64MB more memory per write task,
blob writes get low
- // benefit from async write, but cost a lot.
- false,
statsDenseStore,
- blobConsumer,
- blobDescriptorFields,
- blobExternalStorageFields,
- blobExternalStoragePath);
+ blobContext);
}
return new RowDataRollingFileWriter(
fileIO,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
index 92b0566d54..93075b2035 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
@@ -32,7 +32,6 @@ import org.apache.paimon.io.SingleFileWriter;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
import org.apache.paimon.statistics.SimpleColStatsCollector;
-import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.LongCounter;
@@ -46,6 +45,7 @@ import java.util.Set;
import java.util.function.Supplier;
import static java.util.Collections.singletonList;
+import static org.apache.paimon.types.BlobType.fieldsInBlobFile;
/** A blob file writer that writes blob files. */
public class MultipleBlobFileWriter implements Closeable {
@@ -64,7 +64,7 @@ public class MultipleBlobFileWriter implements Closeable {
long targetFileSize,
@Nullable BlobConsumer blobConsumer,
Set<String> blobDescriptorFields) {
- RowType blobRowType = BlobType.splitBlob(writeSchema,
blobDescriptorFields).getRight();
+ RowType blobRowType = new RowType(fieldsInBlobFile(writeSchema,
blobDescriptorFields));
this.blobWriters = new ArrayList<>();
for (String blobFieldName : blobRowType.getFieldNames()) {
BlobFileFormat blobFileFormat = new BlobFileFormat();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index 2fc794e7e0..eb4ec0a0c1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -18,7 +18,6 @@
package org.apache.paimon.append;
-import org.apache.paimon.data.BlobConsumer;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
@@ -31,7 +30,8 @@ import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.io.RowDataFileWriter;
import org.apache.paimon.io.SingleFileWriter;
import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.types.BlobType;
+import org.apache.paimon.operation.BlobFileContext;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.LongCounter;
import org.apache.paimon.utils.Preconditions;
@@ -48,9 +48,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.function.Supplier;
+import static org.apache.paimon.types.BlobType.fieldsNotInBlobFile;
+
/**
* A rolling file writer that handles both normal data and blob data. This
writer creates separate
* files for normal columns and blob columns, managing their lifecycle and
ensuring consistency
@@ -84,7 +85,6 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
writerFactory;
private final Supplier<MultipleBlobFileWriter> blobWriterFactory;
private final long targetFileSize;
- private final Set<String> blobDescriptorFields;
@Nullable private final ExternalStorageBlobWriter
externalStorageBlobWriter;
// State management
@@ -110,17 +110,18 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
StatsCollectorFactories statsCollectorFactories,
FileIndexOptions fileIndexOptions,
FileSource fileSource,
- boolean asyncFileWrite,
boolean statsDenseStore,
- @Nullable BlobConsumer blobConsumer,
- Set<String> blobDescriptorFields,
- Set<String> blobExternalStorageFields,
- @Nullable String blobExternalStoragePath) {
+ BlobFileContext context) {
// Initialize basic fields
this.targetFileSize = targetFileSize;
this.results = new ArrayList<>();
this.closedWriters = new ArrayList<>();
- this.blobDescriptorFields = blobDescriptorFields;
+
+ // blob write does not need async write, cause blob write is
I/O-intensive, not
+ // CPU-intensive process.
+ // Async write will cost 64MB more memory per write task, blob writes
get low
+ // benefit from async write, but cost a lot.
+ boolean asyncFileWrite = false;
// Initialize writer factory for normal data
this.writerFactory =
@@ -128,7 +129,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
fileIO,
schemaId,
fileFormat,
- BlobType.splitBlob(writeSchema,
blobDescriptorFields).getLeft(),
+ fieldsNotInBlobFile(writeSchema,
context.blobDescriptorFields()),
writeSchema,
pathFactory,
seqNumCounterSupplier,
@@ -152,18 +153,18 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
asyncFileWrite,
statsDenseStore,
blobTargetFileSize,
- blobConsumer,
- this.blobDescriptorFields);
+ context.blobConsumer(),
+ context.blobDescriptorFields());
// Initialize writer for descriptor fields backed by external storage
if needed.
- if (!blobExternalStorageFields.isEmpty()) {
+ if (!context.blobExternalStorageFields().isEmpty()) {
this.externalStorageBlobWriter =
new ExternalStorageBlobWriter(
fileIO,
schemaId,
writeSchema,
- blobExternalStorageFields,
- blobExternalStoragePath,
+ context.blobExternalStorageFields(),
+ context.blobExternalStoragePath(),
pathFactory,
seqNumCounterSupplier,
fileSource,
@@ -182,7 +183,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
FileIO fileIO,
long schemaId,
FileFormat fileFormat,
- RowType normalRowType,
+ List<DataField> fieldsNotInBlobFile,
RowType writeSchema,
DataFilePathFactory pathFactory,
Supplier<LongCounter> seqNumCounterSupplier,
@@ -192,7 +193,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
FileSource fileSource,
boolean asyncFileWrite,
boolean statsDenseStore) {
-
+ RowType normalRowType = new RowType(fieldsNotInBlobFile);
List<String> normalColumnNames = normalRowType.getFieldNames();
int[] projectionNormalFields =
writeSchema.projectIndexes(normalColumnNames);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index b88935bfa1..a714658f2c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -58,13 +58,11 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.apache.paimon.format.FileFormat.fileFormat;
-import static org.apache.paimon.types.DataTypeRoot.BLOB;
import static
org.apache.paimon.utils.StatsCollectorFactories.createStatsFactories;
/** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */
@@ -81,14 +79,10 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
private final FileIndexOptions fileIndexOptions;
private final RowType rowType;
+ private @Nullable BlobFileContext blobContext;
private RowType writeType;
private @Nullable List<String> writeCols;
private boolean forceBufferSpill = false;
- private boolean withBlob;
- private @Nullable BlobConsumer blobConsumer;
- private final Set<String> blobDescriptorFields;
- private final Set<String> blobExternalStorageFields;
- @Nullable private final String blobExternalStoragePath;
public BaseAppendFileStoreWrite(
FileIO fileIO,
@@ -111,17 +105,15 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
this.writeCols = null;
this.fileFormat = fileFormat(options);
this.pathFactory = pathFactory;
- this.withBlob = rowType.getFieldTypes().stream().anyMatch(t ->
t.is(BLOB));
- this.blobDescriptorFields = options.blobDescriptorField();
- this.blobExternalStorageFields = options.blobExternalStorageField();
- this.blobExternalStoragePath = options.blobExternalStoragePath();
-
+ this.blobContext = BlobFileContext.create(rowType, options);
this.fileIndexOptions = options.indexColumnsOptions();
}
@Override
public BaseAppendFileStoreWrite withBlobConsumer(BlobConsumer
blobConsumer) {
- this.blobConsumer = blobConsumer;
+ if (blobContext != null) {
+ blobContext = blobContext.withBlobConsumer(blobConsumer);
+ }
return this;
}
@@ -159,17 +151,16 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
fileIndexOptions,
options.asyncFileWrite(),
options.statsDenseStore(),
- blobConsumer,
- blobDescriptorFields,
- blobExternalStorageFields,
- blobExternalStoragePath,
- options.dataEvolutionEnabled());
+ options.dataEvolutionEnabled(),
+ blobContext);
}
@Override
public void withWriteType(RowType writeType) {
this.writeType = writeType;
- this.withBlob = writeType.getFieldTypes().stream().anyMatch(t ->
t.is(BLOB));
+ if (blobContext != null) {
+ blobContext = blobContext.withWriteType(writeType);
+ }
int fullCount = rowType.getFieldCount();
List<String> fullNames = rowType.getFieldNames();
this.writeCols = writeType.getFieldNames();
@@ -302,7 +293,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
if (ioManager == null) {
return;
}
- if (withBlob) {
+ if (blobContext != null) {
return;
}
if (forceBufferSpill) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java
new file mode 100644
index 0000000000..2069574579
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BlobConsumer;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.Set;
+
+import static org.apache.paimon.types.DataTypeRoot.BLOB;
+
+/** Context for blob file. */
+public class BlobFileContext {
+
+ private final Set<String> blobDescriptorFields;
+ private final Set<String> blobExternalStorageFields;
+ @Nullable private final String blobExternalStoragePath;
+
+ private @Nullable BlobConsumer blobConsumer;
+
+ private BlobFileContext(
+ Set<String> blobDescriptorFields,
+ Set<String> blobExternalStorageFields,
+ @Nullable String blobExternalStoragePath) {
+ this.blobDescriptorFields = blobDescriptorFields;
+ this.blobExternalStorageFields = blobExternalStorageFields;
+ this.blobExternalStoragePath = blobExternalStoragePath;
+ }
+
+ @Nullable
+ public static BlobFileContext create(RowType rowType, CoreOptions options)
{
+ if (rowType.getFieldTypes().stream().noneMatch(t -> t.is(BLOB))) {
+ return null;
+ }
+ Set<String> descriptorFields = options.blobDescriptorField();
+ Set<String> externalStorageField = options.blobExternalStorageField();
+ String externalStoragePath = options.blobExternalStoragePath();
+ boolean requireBlobFile = false;
+ for (DataField field : rowType.getFields()) {
+ DataTypeRoot type = field.type().getTypeRoot();
+ if (type == DataTypeRoot.BLOB
+ && (!descriptorFields.contains(field.name())
+ || externalStorageField.contains(field.name()))) {
+ requireBlobFile = true;
+ break;
+ }
+ }
+ if (!requireBlobFile) {
+ return null;
+ }
+ return new BlobFileContext(descriptorFields, externalStorageField,
externalStoragePath);
+ }
+
+ public BlobFileContext withBlobConsumer(BlobConsumer blobConsumer) {
+ this.blobConsumer = blobConsumer;
+ return this;
+ }
+
+ public BlobFileContext withWriteType(RowType writeType) {
+ if (writeType.getFieldTypes().stream().noneMatch(t -> t.is(BLOB))) {
+ return null;
+ }
+ return this;
+ }
+
+ public Set<String> blobDescriptorFields() {
+ return blobDescriptorFields;
+ }
+
+ public Set<String> blobExternalStorageFields() {
+ return blobExternalStorageFields;
+ }
+
+ @Nullable
+ public String blobExternalStoragePath() {
+ return blobExternalStoragePath;
+ }
+
+ @Nullable
+ public BlobConsumer blobConsumer() {
+ return blobConsumer;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 4aac1257f7..ebb5a0f9c5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -31,7 +31,6 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
@@ -41,7 +40,6 @@ import org.apache.paimon.types.MapType;
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
-import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
@@ -79,6 +77,7 @@ import static
org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
import static
org.apache.paimon.table.PrimaryKeyTableUtils.createMergeFunctionFactory;
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
import static org.apache.paimon.table.SpecialFields.SYSTEM_FIELD_NAMES;
+import static org.apache.paimon.types.BlobType.fieldsNotInBlobFile;
import static org.apache.paimon.types.DataTypeRoot.ARRAY;
import static org.apache.paimon.types.DataTypeRoot.MAP;
import static org.apache.paimon.types.DataTypeRoot.MULTISET;
@@ -163,7 +162,7 @@ public class SchemaValidation {
Set<String> blobDescriptorFields =
validateBlobDescriptorFields(tableRowType, options);
validateBlobExternalStorageFields(tableRowType, options,
blobDescriptorFields);
fileFormat.validateDataFields(
- BlobType.splitBlob(tableRowType,
blobDescriptorFields).getLeft());
+ new RowType(fieldsNotInBlobFile(tableRowType,
blobDescriptorFields)));
// Check column names in schema
schema.fieldNames()
@@ -611,14 +610,18 @@ public class SchemaValidation {
"Data evolution config must disabled with
clustering.incremental");
}
- Pair<RowType, RowType> normalAndBlobType =
BlobType.splitBlob(schema.logicalRowType());
- List<String> blobNames = normalAndBlobType.getRight().getFieldNames();
+ List<DataField> fields = schema.fields();
+ List<String> blobNames =
+ fields.stream()
+ .filter(field -> field.type().is(DataTypeRoot.BLOB))
+ .map(DataField::name)
+ .collect(Collectors.toList());
if (!blobNames.isEmpty()) {
checkArgument(
options.dataEvolutionEnabled(),
"Data evolution config must enabled for table with BLOB
type column.");
checkArgument(
- normalAndBlobType.getLeft().getFieldCount() > 0,
+ fields.size() > blobNames.size(),
"Table with BLOB type column must have other normal
columns.");
checkArgument(
blobNames.stream().noneMatch(schema.partitionKeys()::contains),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index ad0ca18f19..b2d820adf8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -41,6 +41,7 @@ import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
+import org.apache.paimon.operation.BlobFileContext;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
@@ -712,11 +713,8 @@ public class AppendOnlyWriterTest {
new FileIndexOptions(),
true,
false,
- null,
- options.blobDescriptorField(),
- options.blobExternalStorageField(),
- options.blobExternalStoragePath(),
- options.dataEvolutionEnabled());
+ options.dataEvolutionEnabled(),
+ BlobFileContext.create(AppendOnlyWriterTest.SCHEMA,
options));
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
return Pair.of(writer, compactManager.allFiles());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
index 2b37f7dc72..11095cb7d0 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
@@ -31,6 +31,7 @@ import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.operation.BlobFileContext;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -43,7 +44,6 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Random;
@@ -91,6 +91,7 @@ public class RollingBlobFileWriterTest {
seqNumCounter = new LongCounter();
// Initialize the writer
+ CoreOptions options = new CoreOptions(new Options());
writer =
new RollingBlobFileWriter(
fileIO,
@@ -102,15 +103,11 @@ public class RollingBlobFileWriterTest {
pathFactory,
() -> seqNumCounter,
COMPRESSION,
- new StatsCollectorFactories(new CoreOptions(new
Options())),
+ new StatsCollectorFactories(options),
new FileIndexOptions(),
FileSource.APPEND,
- false, // asyncFileWrite
false, // statsDenseStore
- null,
- Collections.emptySet(),
- Collections.emptySet(),
- null);
+ BlobFileContext.create(SCHEMA, options));
}
@Test
@@ -208,12 +205,8 @@ public class RollingBlobFileWriterTest {
new StatsCollectorFactories(new CoreOptions(new
Options())),
new FileIndexOptions(),
FileSource.APPEND,
- false, // asyncFileWrite
false, // statsDenseStore
- null,
- Collections.emptySet(),
- Collections.emptySet(),
- null);
+ BlobFileContext.create(SCHEMA, new CoreOptions(new
Options())));
// Create large blob data that will exceed the blob target file size
byte[] largeBlobData = new byte[3 * 1024 * 1024]; // 3 MB blob data
@@ -288,12 +281,8 @@ public class RollingBlobFileWriterTest {
new StatsCollectorFactories(new CoreOptions(new
Options())),
new FileIndexOptions(),
FileSource.APPEND,
- false, // asyncFileWrite
false, // statsDenseStore
- null,
- Collections.emptySet(),
- Collections.emptySet(),
- null);
+ BlobFileContext.create(SCHEMA, new CoreOptions(new
Options())));
// Create blob data that will trigger rolling
byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data
@@ -370,12 +359,8 @@ public class RollingBlobFileWriterTest {
new StatsCollectorFactories(new CoreOptions(new
Options())),
new FileIndexOptions(),
FileSource.APPEND,
- false, // asyncFileWrite
false, // statsDenseStore
- null,
- Collections.emptySet(),
- Collections.emptySet(),
- null);
+ BlobFileContext.create(SCHEMA, new CoreOptions(new
Options())));
// Create blob data that will trigger rolling
byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data
@@ -590,12 +575,8 @@ public class RollingBlobFileWriterTest {
new StatsCollectorFactories(new CoreOptions(new
Options())),
new FileIndexOptions(),
FileSource.APPEND,
- false, // asyncFileWrite
false, // statsDenseStore
- null,
- Collections.emptySet(),
- Collections.emptySet(),
- null);
+ BlobFileContext.create(SCHEMA, new CoreOptions(new
Options())));
// Write data
for (int i = 0; i < 3; i++) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 5dc5635bea..e350cb2a9a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -42,6 +42,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.operation.BlobFileContext;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
@@ -291,11 +292,8 @@ public class KeyValueFileReadWriteTest {
new FileIndexOptions(),
true,
false,
- null,
- options.blobDescriptorField(),
- options.blobExternalStorageField(),
- options.blobExternalStoragePath(),
- options.dataEvolutionEnabled());
+ options.dataEvolutionEnabled(),
+ BlobFileContext.create(schema, options));
appendOnlyWriter.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
appendOnlyWriter.write(