This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2c7bb263b7 [core] Support external-storage BLOB columns and MERGE INTO
updates for descriptor-based BLOB columns (#7328)
2c7bb263b7 is described below
commit 2c7bb263b7da0bec1940ba888af0c8fd7f0b5ff5
Author: Junrui Lee <[email protected]>
AuthorDate: Tue Mar 3 13:53:54 2026 +0800
[core] Support external-storage BLOB columns and MERGE INTO updates for
descriptor-based BLOB columns (#7328)
This PR adds support for descriptor-based BLOB fields backed by external
storage. For these fields, Paimon writes raw BLOB data to a configured
external storage path at write time and stores only serialized
BlobDescriptors inline in data files. The change also adds validation
for the new external-storage BLOB options and verifies that raw-data
BLOB fields, descriptor-based BLOB fields, and descriptor-based BLOB
fields backed by external storage can coexist in the same table.
This PR also refines MERGE INTO validation for BLOB columns in Flink and
Spark. Updates are still rejected for raw-data BLOB columns, but are now
allowed for descriptor-based BLOB columns, including those backed by
external storage.
---
docs/content/append-table/blob.md | 88 +++++-
.../shortcodes/generated/core_configuration.html | 12 +
.../main/java/org/apache/paimon/CoreOptions.java | 58 +++-
.../org/apache/paimon/append/AppendOnlyWriter.java | 13 +-
.../paimon/append/ExternalStorageBlobWriter.java | 329 +++++++++++++++++++++
.../paimon/append/RollingBlobFileWriter.java | 40 ++-
.../org/apache/paimon/io/DataFilePathFactory.java | 6 +
.../paimon/operation/BaseAppendFileStoreWrite.java | 6 +
.../org/apache/paimon/schema/SchemaValidation.java | 32 ++
.../apache/paimon/append/AppendOnlyWriterTest.java | 2 +
.../org/apache/paimon/append/BlobTableTest.java | 175 +++++++++++
.../paimon/append/RollingBlobFileWriterTest.java | 20 +-
.../paimon/io/KeyValueFileReadWriteTest.java | 2 +
.../flink/action/DataEvolutionMergeIntoAction.java | 29 +-
.../org/apache/paimon/flink/BlobTableITCase.java | 101 ++++++-
.../action/DataEvolutionMergeIntoActionITCase.java | 182 ++++++++++++
.../spark/commands/DataEvolutionPaimonWriter.scala | 11 +-
.../org/apache/paimon/spark/sql/BlobTestBase.scala | 90 ++++++
18 files changed, 1177 insertions(+), 19 deletions(-)
diff --git a/docs/content/append-table/blob.md
b/docs/content/append-table/blob.md
index 39dea1eee9..9d3135d8b2 100644
--- a/docs/content/append-table/blob.md
+++ b/docs/content/append-table/blob.md
@@ -69,6 +69,21 @@ This separation provides several benefits:
For details about the blob file format structure, see [File Format - BLOB]({{<
ref "concepts/spec/fileformat#blob" >}}).
+## Storage Modes
+
+Paimon supports three storage modes for BLOB fields:
+
+1. **Default blob storage**
+ Blob bytes are written to Paimon-managed `.blob` files under the table path.
+
+2. **Descriptor-only storage**
+ Fields configured in `blob-descriptor-field` store only serialized
`BlobDescriptor` bytes inline in data files. Paimon does not write `.blob`
files for these fields, and writes must provide descriptor-based input.
+
+3. **External-storage descriptor mode**
+ Fields configured in `blob-external-storage-field` are a subset of
`blob-descriptor-field`. At write time, Paimon writes the raw blob data to the
configured `blob-external-storage-path` and stores only serialized
`BlobDescriptor` bytes inline in data files.
+
+This allows one table to mix raw-data BLOB fields, descriptor-only BLOB
fields, and descriptor-based BLOB fields backed by external storage.
+
## Table Options
<table class="table table-bordered">
@@ -108,6 +123,27 @@ For details about the blob file format structure, see
[File Format - BLOB]({{< r
some BLOB fields in <code>.blob</code> files and some as descriptor
references.
</td>
</tr>
+ <tr>
+ <td><h5>blob-external-storage-field</h5></td>
+ <td>No</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>
+ Comma-separated BLOB field names whose raw data should be written to
external storage at write time.
+ This option must be a subset of <code>blob-descriptor-field</code>.
+ For these fields, Paimon stores serialized <code>BlobDescriptor</code>
bytes inline in data files.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>blob-external-storage-path</h5></td>
+ <td>No</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>
+ External storage path used by fields configured in
<code>blob-external-storage-field</code>.
+ Orphan file cleanup is not applied to this path.
+ </td>
+ </tr>
<tr>
<td><h5>blob.target-file-size</h5></td>
<td>No</td>
@@ -243,6 +279,31 @@ ALTER TABLE blob_table SET ('blob-as-descriptor' =
'false');
SELECT image FROM blob_table;
```
+### External-Storage Descriptor Fields
+
+If you want Paimon to accept raw BLOB input, write the data to an external
location, and store only descriptor bytes inline, configure the target field(s)
like this:
+
+```sql
+'blob-descriptor-field' = 'image',
+'blob-external-storage-field' = 'image',
+'blob-external-storage-path' = 's3://my-bucket/paimon-external-blobs/'
+```
+
+For these configured fields:
+
+- Paimon writes the raw blob data to `blob-external-storage-path`
+- Paimon stores serialized `BlobDescriptor` bytes inline in normal data files
+- the field remains descriptor-based when reading and updating
+- orphan file cleanup is not applied to the external storage path
+
+### MERGE INTO Support
+
+For Data Evolution writes in Flink and Spark:
+
+- raw-data BLOB columns are still rejected in partial-column `MERGE INTO`
updates
+- descriptor-based BLOB columns are allowed
+- fields configured in `blob-external-storage-field` are also allowed because
they are descriptor-based fields
+
## Java API Usage
### Creating a Table
@@ -447,7 +508,8 @@ Paimon write path is descriptor-aware automatically:
1. For blob fields stored in `.blob` files, input can be either blob bytes or
a `BlobDescriptor`.
2. For fields configured in `blob-descriptor-field`, Paimon stores descriptor
bytes inline in data files (no `.blob` files for those fields), and input must
be a descriptor.
-3. This behavior does not depend on `blob-as-descriptor`.
+3. For fields configured in `blob-external-storage-field`, Paimon writes the
blob data to `blob-external-storage-path` and stores descriptor bytes inline in
data files.
+4. This behavior does not depend on `blob-as-descriptor`.
```java
import org.apache.paimon.catalog.Catalog;
@@ -575,12 +637,30 @@ If you want downstream tables to **reuse** upstream blob
files (no copying and n
For these configured fields, Paimon stores only serialized
<code>BlobDescriptor</code> bytes in normal data files. Reading the blob
follows the descriptor URI to access bytes, and writing requires descriptor
input for those fields.
+### Blob storage mode: EXTERNAL STORAGE
+
+If you want Paimon to write raw blob data to a separate external location
while keeping only descriptor bytes inline, configure the target blob field(s):
+
+```sql
+'blob-descriptor-field' = 'image',
+'blob-external-storage-field' = 'image',
+'blob-external-storage-path' = 'oss://bucket/path/'
+```
+
+For these configured fields:
+
+- raw blob data is written to the configured external storage path
+- normal data files keep only serialized <code>BlobDescriptor</code> bytes
+- writes can still start from raw BLOB input
+- the field is treated as descriptor-based for operations such as `MERGE INTO`
+
## Limitations
1. **Append Table Only**: Blob type is designed for append-only tables.
Primary key tables are not supported.
2. **No Predicate Pushdown**: Blob columns cannot be used in filter predicates.
3. **No Statistics**: Statistics collection is not supported for blob columns.
4. **Required Options**: `row-tracking.enabled` and `data-evolution.enabled`
must be set to `true`.
+5. **External Storage Cleanup**: Files written through
`blob-external-storage-path` are outside Paimon's orphan file cleanup scope.
## Best Practices
@@ -590,4 +670,8 @@ For these configured fields, Paimon stores only serialized
<code>BlobDescriptor<
3. **Use Descriptor Fields When Reusing External Blob Files**: Configure
`blob-descriptor-field` for fields that should keep descriptor references
instead of writing new `.blob` files.
-4. **Use Partitioning**: Partition your blob tables by date or other
dimensions to improve query performance and data management.
+4. **Use External-Storage Fields When Accepting Raw Input But Storing
Descriptors**: Configure `blob-external-storage-field` together with
`blob-external-storage-path` when upstream writes raw blob bytes but you want
descriptor-based storage.
+
+5. **Manage External Storage Lifecycle Separately**: Files written to
`blob-external-storage-path` are not cleaned up by Paimon, so retention and
deletion should be managed externally.
+
+6. **Use Partitioning**: Partition your blob tables by date or other
dimensions to improve query performance and data management.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 4f012492d8..90419b609b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -56,6 +56,18 @@ under the License.
<td>Boolean</td>
<td>Write blob field using blob descriptor rather than blob
bytes.</td>
</tr>
+ <tr>
+ <td><h5>blob-external-storage-field</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Comma-separated BLOB field names (must be a subset of
'blob-descriptor-field') whose raw data will be written to external storage at
write time. The external storage path is configured via
'blob-external-storage-path'. Orphan file cleanup is not applied to that
path.</td>
+ </tr>
+ <tr>
+ <td><h5>blob-external-storage-path</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The external storage path where raw BLOB data from fields
configured by 'blob-external-storage-field' is written at write time. Orphan
file cleanup is not applied to this path.</td>
+ </tr>
<tr>
<td><h5>blob-descriptor-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index ca3c576512..12799630cc 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2182,6 +2182,29 @@ public class CoreOptions implements Serializable {
.withDescription(
"Write blob field using blob descriptor rather
than blob bytes.");
+ @Immutable
+ public static final ConfigOption<String> BLOB_EXTERNAL_STORAGE_PATH =
+ key("blob-external-storage-path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The external storage path where raw BLOB data
from fields configured "
+ + "by 'blob-external-storage-field' is
written at write time. "
+ + "Orphan file cleanup is not applied to
this path.");
+
+ @Immutable
+ public static final ConfigOption<String> BLOB_EXTERNAL_STORAGE_FIELD =
+ key("blob-external-storage-field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Comma-separated BLOB field names (must be a
subset of '"
+ + BLOB_DESCRIPTOR_FIELD.key()
+ + "') whose raw data will be written to
external storage at "
+ + "write time. The external storage path
is configured via '"
+ + BLOB_EXTERNAL_STORAGE_PATH.key()
+ + "'. Orphan file cleanup is not applied
to that path.");
+
public static final ConfigOption<Boolean> COMMIT_DISCARD_DUPLICATE_FILES =
key("commit.discard-duplicate-files")
.booleanType()
@@ -2745,7 +2768,40 @@ public class CoreOptions implements Serializable {
* <p>If this option is not set, all blob fields are stored in '.blob'
files by default.
*/
public Set<String> blobDescriptorField() {
- return options.getOptional(BLOB_DESCRIPTOR_FIELD)
+ return parseCommaSeparatedSet(BLOB_DESCRIPTOR_FIELD);
+ }
+
+ /**
+ * Resolve blob fields whose data should be written to external storage at
write time. These
+ * fields must be a subset of {@link #blobDescriptorField()}.
+ */
+ public Set<String> blobExternalStorageField() {
+ return parseCommaSeparatedSet(BLOB_EXTERNAL_STORAGE_FIELD);
+ }
+
+ /**
+ * Returns the set of BLOB fields that support partial updates (e.g. via
MERGE INTO).
+ *
+ * <p>Currently, only descriptor-based BLOB fields (configured via {@link
+ * #BLOB_DESCRIPTOR_FIELD}) are updatable. Raw-data BLOB fields are not
updatable because the
+ * update cost is too high. Fields configured by {@link
#BLOB_EXTERNAL_STORAGE_FIELD} are a
+ * subset of descriptor fields and therefore are also updatable.
+ */
+ public Set<String> updatableBlobFields() {
+ return blobDescriptorField();
+ }
+
+ /**
+ * Return the external storage path for descriptor BLOB fields that write
raw data outside the
+ * table location. Returns null if not configured.
+ */
+ @Nullable
+ public String blobExternalStoragePath() {
+ return options.get(BLOB_EXTERNAL_STORAGE_PATH);
+ }
+
+ private Set<String> parseCommaSeparatedSet(ConfigOption<String> option) {
+ return options.getOptional(option)
.map(
s ->
Arrays.stream(s.split(","))
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index b3ca6e6f69..4c2ebebc39 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -96,6 +96,8 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
private final MemorySize maxDiskSize;
@Nullable private final BlobConsumer blobConsumer;
private final Set<String> blobDescriptorFields;
+ private final Set<String> blobExternalStorageFields;
+ @Nullable private final String blobExternalStoragePath;
@Nullable private CompactDeletionFile compactDeletionFile;
private SinkWriter<InternalRow> sinkWriter;
@@ -127,6 +129,8 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
boolean statsDenseStore,
@Nullable BlobConsumer blobConsumer,
Set<String> blobDescriptorFields,
+ Set<String> blobExternalStorageFields,
+ @Nullable String blobExternalStoragePath,
boolean dataEvolutionEnabled) {
this.fileIO = fileIO;
this.schemaId = schemaId;
@@ -156,6 +160,8 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
this.maxDiskSize = maxDiskSize;
this.fileIndexOptions = fileIndexOptions;
this.blobDescriptorFields = blobDescriptorFields;
+ this.blobExternalStorageFields = blobExternalStorageFields;
+ this.blobExternalStoragePath = blobExternalStoragePath;
this.sinkWriter =
useWriteBuffer
@@ -307,7 +313,8 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
}
private RollingFileWriter<InternalRow, DataFileMeta>
createRollingRowWriter() {
- if (BlobType.splitBlob(writeSchema,
blobDescriptorFields).getRight().getFieldCount() > 0) {
+ if (BlobType.splitBlob(writeSchema,
blobDescriptorFields).getRight().getFieldCount() > 0
+ || !blobExternalStorageFields.isEmpty()) {
return new RollingBlobFileWriter(
fileIO,
schemaId,
@@ -328,7 +335,9 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
false,
statsDenseStore,
blobConsumer,
- blobDescriptorFields);
+ blobDescriptorFields,
+ blobExternalStorageFields,
+ blobExternalStoragePath);
}
return new RowDataRollingFileWriter(
fileIO,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/ExternalStorageBlobWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/ExternalStorageBlobWriter.java
new file mode 100644
index 0000000000..843506dd6d
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/append/ExternalStorageBlobWriter.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.casting.FallbackMappingRow;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobConsumer;
+import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.format.blob.BlobFileFormat;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.io.RollingFileWriterImpl;
+import org.apache.paimon.io.RowDataFileWriter;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.UriReader;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+
+import static java.util.Collections.singletonList;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * A writer for descriptor BLOB fields that write raw data to a configured
external storage path.
+ *
+ * <p>For each configured field, this writer writes BLOB data to the external
storage path using
+ * {@link BlobFileFormat} (standard blob format with CRC32 and index), then
replaces the BLOB value
+ * with a {@link BlobDescriptor}.
+ *
+ * <p>The external storage path is configured via {@link
CoreOptions#BLOB_EXTERNAL_STORAGE_PATH}.
+ * Internally, each field writer reuses the same {@link ProjectedFileWriter} +
{@link
+ * RollingFileWriterImpl} infrastructure as {@link MultipleBlobFileWriter},
with rolling support.
+ * The produced {@link DataFileMeta} is discarded since these files live
outside Paimon's
+ * management.
+ */
+public class ExternalStorageBlobWriter implements Closeable {
+
+ private final List<ExternalStorageBlobFieldWriter> fieldWriters;
+ private final UriReader uriReader;
+ private final GenericRow overrideRow;
+ private final FallbackMappingRow resultRow;
+
+ public ExternalStorageBlobWriter(
+ FileIO fileIO,
+ long schemaId,
+ RowType writeSchema,
+ Set<String> externalStorageFields,
+ String externalStoragePath,
+ DataFilePathFactory pathFactory,
+ Supplier<LongCounter> seqNumCounterSupplier,
+ FileSource fileSource,
+ boolean asyncFileWrite,
+ boolean statsDenseStore,
+ long targetFileSize) {
+ checkNotNull(
+ externalStoragePath,
+ "'%s' must be set when '%s' is configured.",
+ CoreOptions.BLOB_EXTERNAL_STORAGE_PATH.key(),
+ CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key());
+
+ this.fieldWriters =
+ createFieldWriters(
+ fileIO,
+ schemaId,
+ writeSchema,
+ externalStorageFields,
+ externalStoragePath,
+ pathFactory,
+ seqNumCounterSupplier,
+ fileSource,
+ asyncFileWrite,
+ statsDenseStore,
+ targetFileSize);
+ this.uriReader = UriReader.fromFile(fileIO);
+
+ int fieldCount = writeSchema.getFieldCount();
+ this.overrideRow = new GenericRow(fieldCount);
+ // identity mappings: position i maps to position i
+ this.resultRow = new FallbackMappingRow(IntStream.range(0,
fieldCount).toArray());
+ }
+
+ /**
+ * Transform a row by writing BLOB data for configured fields to the
external storage path and
+ * replacing them with {@link BlobDescriptor} references.
+ *
+ * @return a new row with configured fields replaced, or the original row
if none configured
+ */
+ public InternalRow transformRow(InternalRow row) throws IOException {
+ if (fieldWriters.isEmpty()) {
+ return row;
+ }
+
+ // clear all override positions so non-overridden fields fall back to
delegate
+ for (ExternalStorageBlobFieldWriter fw : fieldWriters) {
+ overrideRow.setField(fw.fieldIndex(), null);
+ }
+
+ for (ExternalStorageBlobFieldWriter fw : fieldWriters) {
+ BlobDescriptor descriptor = fw.writeAndReplace(row);
+ if (descriptor != null) {
+ overrideRow.setField(fw.fieldIndex(),
Blob.fromDescriptor(uriReader, descriptor));
+ }
+ }
+
+ // override row as main (non-null wins), original row as fallback
+ return resultRow.replace(overrideRow, row);
+ }
+
+ @Override
+ public void close() throws IOException {
+ for (ExternalStorageBlobFieldWriter fw : fieldWriters) {
+ fw.close();
+ }
+ }
+
+ public void abort() {
+ for (ExternalStorageBlobFieldWriter fw : fieldWriters) {
+ fw.abort();
+ }
+ }
+
+ // ------------------------------ Helper methods
------------------------------
+
+ private static List<ExternalStorageBlobFieldWriter> createFieldWriters(
+ FileIO fileIO,
+ long schemaId,
+ RowType writeSchema,
+ Set<String> externalStorageFields,
+ String externalStoragePath,
+ DataFilePathFactory pathFactory,
+ Supplier<LongCounter> seqNumCounterSupplier,
+ FileSource fileSource,
+ boolean asyncFileWrite,
+ boolean statsDenseStore,
+ long targetFileSize) {
+ List<ExternalStorageBlobFieldWriter> writers = new ArrayList<>();
+ for (DataField field : writeSchema.getFields()) {
+ if (field.type().getTypeRoot() == DataTypeRoot.BLOB
+ && externalStorageFields.contains(field.name())) {
+ writers.add(
+ createFieldWriter(
+ fileIO,
+ schemaId,
+ writeSchema,
+ field.name(),
+ externalStoragePath,
+ pathFactory,
+ seqNumCounterSupplier,
+ fileSource,
+ asyncFileWrite,
+ statsDenseStore,
+ targetFileSize));
+ }
+ }
+ return writers;
+ }
+
+ private static ExternalStorageBlobFieldWriter createFieldWriter(
+ FileIO fileIO,
+ long schemaId,
+ RowType writeSchema,
+ String fieldName,
+ String externalStoragePath,
+ DataFilePathFactory pathFactory,
+ Supplier<LongCounter> seqNumCounterSupplier,
+ FileSource fileSource,
+ boolean asyncFileWrite,
+ boolean statsDenseStore,
+ long targetFileSize) {
+ int fieldIndex = writeSchema.getFieldIndex(fieldName);
+ ExternalStorageBlobFieldWriter fieldWriter = new
ExternalStorageBlobFieldWriter(fieldIndex);
+
+ BlobFileFormat blobFileFormat = new BlobFileFormat();
+ blobFileFormat.setWriteConsumer(fieldWriter);
+
+ RowType projectedType = writeSchema.project(fieldName);
+ fieldWriter.setWriter(
+ new ProjectedFileWriter<>(
+ createRollingBlobWriter(
+ fileIO,
+ schemaId,
+ blobFileFormat,
+ projectedType,
+ fieldName,
+ externalStoragePath,
+ pathFactory,
+ seqNumCounterSupplier,
+ fileSource,
+ asyncFileWrite,
+ statsDenseStore,
+ targetFileSize),
+ writeSchema.projectIndexes(singletonList(fieldName))));
+
+ return fieldWriter;
+ }
+
+ private static RollingFileWriterImpl<InternalRow, DataFileMeta>
createRollingBlobWriter(
+ FileIO fileIO,
+ long schemaId,
+ BlobFileFormat blobFileFormat,
+ RowType projectedType,
+ String fieldName,
+ String externalStoragePath,
+ DataFilePathFactory pathFactory,
+ Supplier<LongCounter> seqNumCounterSupplier,
+ FileSource fileSource,
+ boolean asyncFileWrite,
+ boolean statsDenseStore,
+ long targetFileSize) {
+ return new RollingFileWriterImpl<>(
+ () ->
+ new RowDataFileWriter(
+ fileIO,
+ RollingFileWriter.createFileWriterContext(
+ blobFileFormat,
+ projectedType,
+ new SimpleColStatsCollector.Factory[] {
+ NoneSimpleColStatsCollector::new
+ },
+ "none"),
+
pathFactory.newExternalStorageBlobPath(externalStoragePath),
+ projectedType,
+ schemaId,
+ seqNumCounterSupplier,
+ new FileIndexOptions(),
+ fileSource,
+ asyncFileWrite,
+ statsDenseStore,
+ false,
+ singletonList(fieldName)),
+ targetFileSize);
+ }
+
+ // ------------------------------ Inner class
------------------------------
+
+ /**
+ * Writes one descriptor BLOB field backed by external storage using
{@link ProjectedFileWriter}
+ * with rolling support. Implements {@link BlobConsumer} to directly
capture the {@link
+ * BlobDescriptor} produced by {@link BlobFileFormat} after each write.
The produced {@link
+ * DataFileMeta} is discarded since the files are written outside the
table location.
+ */
+ private static class ExternalStorageBlobFieldWriter implements
BlobConsumer, Closeable {
+
+ private final int fieldIndex;
+ private ProjectedFileWriter<
+ RollingFileWriterImpl<InternalRow, DataFileMeta>,
List<DataFileMeta>>
+ writer;
+
+ /** The descriptor captured from the last {@link #accept} call. */
+ @Nullable private BlobDescriptor lastDescriptor;
+
+ ExternalStorageBlobFieldWriter(int fieldIndex) {
+ this.fieldIndex = fieldIndex;
+ }
+
+ void setWriter(
+ ProjectedFileWriter<
+ RollingFileWriterImpl<InternalRow,
DataFileMeta>,
+ List<DataFileMeta>>
+ writer) {
+ this.writer = writer;
+ }
+
+ int fieldIndex() {
+ return fieldIndex;
+ }
+
+ @Override
+ public boolean accept(String blobFieldName, BlobDescriptor
blobDescriptor) {
+ this.lastDescriptor = blobDescriptor;
+ return true;
+ }
+
+ /**
+ * Write the BLOB value from {@code src} to the external storage path.
+ *
+ * @return the {@link BlobDescriptor} for the written data, or null if
the field is null
+ */
+ @Nullable
+ BlobDescriptor writeAndReplace(InternalRow src) throws IOException {
+ writer.write(src);
+ return lastDescriptor;
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ void abort() {
+ writer.abort();
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index a34023889d..2fc794e7e0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -85,6 +85,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
private final Supplier<MultipleBlobFileWriter> blobWriterFactory;
private final long targetFileSize;
private final Set<String> blobDescriptorFields;
+ @Nullable private final ExternalStorageBlobWriter
externalStorageBlobWriter;
// State management
private final List<FileWriterAbortExecutor> closedWriters;
@@ -112,7 +113,9 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
boolean asyncFileWrite,
boolean statsDenseStore,
@Nullable BlobConsumer blobConsumer,
- Set<String> blobDescriptorFields) {
+ Set<String> blobDescriptorFields,
+ Set<String> blobExternalStorageFields,
+ @Nullable String blobExternalStoragePath) {
// Initialize basic fields
this.targetFileSize = targetFileSize;
this.results = new ArrayList<>();
@@ -151,6 +154,25 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
blobTargetFileSize,
blobConsumer,
this.blobDescriptorFields);
+
+ // Initialize writer for descriptor fields backed by external storage
if needed.
+ if (!blobExternalStorageFields.isEmpty()) {
+ this.externalStorageBlobWriter =
+ new ExternalStorageBlobWriter(
+ fileIO,
+ schemaId,
+ writeSchema,
+ blobExternalStorageFields,
+ blobExternalStoragePath,
+ pathFactory,
+ seqNumCounterSupplier,
+ fileSource,
+ asyncFileWrite,
+ statsDenseStore,
+ blobTargetFileSize);
+ } else {
+ this.externalStorageBlobWriter = null;
+ }
}
/** Creates a factory for normal data writers. */
@@ -207,14 +229,20 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
@Override
public void write(InternalRow row) throws IOException {
try {
+ // Transform descriptor BLOB fields backed by external storage
first.
+ InternalRow transformedRow =
+ externalStorageBlobWriter != null
+ ? externalStorageBlobWriter.transformRow(row)
+ : row;
+
if (currentWriter == null) {
currentWriter = writerFactory.get();
}
if (blobWriter == null) {
blobWriter = blobWriterFactory.get();
}
- currentWriter.write(row);
- blobWriter.write(row);
+ blobWriter.write(transformedRow);
+ currentWriter.write(transformedRow);
recordCount++;
if (rollingFile()) {
@@ -274,6 +302,9 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
blobWriter.abort();
blobWriter = null;
}
+ if (externalStorageBlobWriter != null) {
+ externalStorageBlobWriter.abort();
+ }
}
/** Checks if the current file should be rolled based on size and record
count. */
@@ -378,6 +409,9 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
try {
closeCurrentWriter();
+ if (externalStorageBlobWriter != null) {
+ externalStorageBlobWriter.close();
+ }
} catch (IOException e) {
handleCloseException(e);
throw e;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index b63a1c0b7a..608262bc19 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -86,6 +86,12 @@ public class DataFilePathFactory {
return newPathFromName(newFileName(dataFilePrefix, ".blob"));
}
+ /** Create a new blob file path under the given external storage path for
descriptor fields. */
+ public Path newExternalStorageBlobPath(String externalStoragePath) {
+ String fileName = newFileName(dataFilePrefix, ".blob");
+ return new Path(externalStoragePath, fileName);
+ }
+
public Path newChangelogPath() {
return newPath(changelogFilePrefix);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index 3050419cc6..b88935bfa1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -87,6 +87,8 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
private boolean withBlob;
private @Nullable BlobConsumer blobConsumer;
private final Set<String> blobDescriptorFields;
+ private final Set<String> blobExternalStorageFields;
+ @Nullable private final String blobExternalStoragePath;
public BaseAppendFileStoreWrite(
FileIO fileIO,
@@ -111,6 +113,8 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
this.pathFactory = pathFactory;
this.withBlob = rowType.getFieldTypes().stream().anyMatch(t ->
t.is(BLOB));
this.blobDescriptorFields = options.blobDescriptorField();
+ this.blobExternalStorageFields = options.blobExternalStorageField();
+ this.blobExternalStoragePath = options.blobExternalStoragePath();
this.fileIndexOptions = options.indexColumnsOptions();
}
@@ -157,6 +161,8 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
options.statsDenseStore(),
blobConsumer,
blobDescriptorFields,
+ blobExternalStorageFields,
+ blobExternalStoragePath,
options.dataEvolutionEnabled());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index e8b536388f..4aac1257f7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -161,6 +161,7 @@ public class SchemaValidation {
FileFormat.fromIdentifier(options.formatType(), new
Options(schema.options()));
RowType tableRowType = new RowType(schema.fields());
Set<String> blobDescriptorFields =
validateBlobDescriptorFields(tableRowType, options);
+ validateBlobExternalStorageFields(tableRowType, options,
blobDescriptorFields);
fileFormat.validateDataFields(
BlobType.splitBlob(tableRowType,
blobDescriptorFields).getLeft());
@@ -642,6 +643,37 @@ public class SchemaValidation {
return configured;
}
+ private static void validateBlobExternalStorageFields(
+ RowType rowType, CoreOptions options, Set<String>
blobDescriptorFields) {
+ Set<String> blobFieldNames =
+ rowType.getFields().stream()
+ .filter(field -> field.type().getTypeRoot() ==
DataTypeRoot.BLOB)
+ .map(DataField::name)
+ .collect(Collectors.toCollection(HashSet::new));
+ Set<String> configured = options.blobExternalStorageField();
+ for (String field : configured) {
+ checkArgument(
+ blobFieldNames.contains(field),
+ "Field '%s' in '%s' must be a BLOB field in table schema.",
+ field,
+ CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key());
+ checkArgument(
+ blobDescriptorFields.contains(field),
+ "Field '%s' in '%s' must also be in '%s'.",
+ field,
+ CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(),
+ CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
+ }
+ if (!configured.isEmpty()) {
+ String externalStoragePath = options.blobExternalStoragePath();
+ checkArgument(
+ externalStoragePath != null &&
!externalStoragePath.isEmpty(),
+ "'%s' must be set when '%s' is configured.",
+ CoreOptions.BLOB_EXTERNAL_STORAGE_PATH.key(),
+ CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key());
+ }
+ }
+
private static void validateIncrementalClustering(TableSchema schema,
CoreOptions options) {
if (options.clusteringIncrementalEnabled()) {
checkArgument(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index d90972d742..ad0ca18f19 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -714,6 +714,8 @@ public class AppendOnlyWriterTest {
false,
null,
options.blobDescriptorField(),
+ options.blobExternalStorageField(),
+ options.blobExternalStoragePath(),
options.dataEvolutionEnabled());
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 34d10386a2..dc0c6c0ac4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -50,12 +50,14 @@ import javax.annotation.Nonnull;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
@@ -333,6 +335,179 @@ public class BlobTableTest extends TableTestBase {
.hasMessageContaining("blob-descriptor-field");
}
+ @Test
+ public void testExternalStorageBlobField() throws Exception {
+ createExternalStorageTable();
+ FileStoreTable table = getTableDefault();
+
+ writeDataDefault(
+ Collections.singletonList(
+ GenericRow.of(
+ 1, BinaryString.fromString("copy-test"), new
BlobData(blobBytes))));
+
+ readDefault(
+ row -> {
+
assertThat(row.getString(1).toString()).isEqualTo("copy-test");
+ // The blob should be readable via descriptor
+ assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+ });
+
+ // Verify the blob file was written to the external storage path
+ java.nio.file.Path externalStoragePath =
tempPath.resolve("external-storage-blob-path");
+ assertThat(Files.exists(externalStoragePath)).isTrue();
+ try (Stream<java.nio.file.Path> stream =
Files.list(externalStoragePath)) {
+ long externalStorageFiles =
+ stream.filter(p ->
p.getFileName().toString().endsWith(".blob")).count();
+ assertThat(externalStorageFiles).isGreaterThanOrEqualTo(1);
+ }
+
+ // Verify no .blob files were created in the table directory
+ long blobFiles = countFilesWithSuffix(table.fileIO(),
table.location(), ".blob");
+ assertThat(blobFiles).isEqualTo(0);
+ }
+
+ @Test
+ public void testThreeTypeBlobCoexistence() throws Exception {
+ createThreeTypeBlobTable();
+ FileStoreTable table = getTableDefault();
+
+ // Prepare external blob for the descriptor field
+ byte[] descriptorBytes = randomBytes();
+ Path external = new
Path(tempPath.resolve("upstream-three-type.bin").toString());
+ writeFile(table.fileIO(), external, descriptorBytes);
+
+ BlobDescriptor descriptor =
+ new BlobDescriptor(external.toString(), 0,
descriptorBytes.length);
+ UriReader uriReader = UriReader.fromFile(table.fileIO());
+ Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+ // Prepare data for the descriptor field backed by external storage
+ byte[] copyBytes = randomBytes();
+
+ writeDataDefault(
+ Collections.singletonList(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("three-types"),
+ new BlobData(blobBytes), // raw-data blob
+ blobRef, // descriptor blob
+ new BlobData(copyBytes)))); // descriptor blob
with external storage
+
+ readDefault(
+ row -> {
+
assertThat(row.getString(1).toString()).isEqualTo("three-types");
+ // Raw-data blob
+ assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+ // Descriptor blob
+
assertThat(row.getBlob(3).toDescriptor()).isEqualTo(descriptor);
+
assertThat(row.getBlob(3).toData()).isEqualTo(descriptorBytes);
+ // External-storage descriptor blob
+ assertThat(row.getBlob(4).toData()).isEqualTo(copyBytes);
+ });
+
+ // Verify raw-data blob files exist (for f2)
+ long blobFiles = countFilesWithSuffix(table.fileIO(),
table.location(), ".blob");
+ assertThat(blobFiles).isGreaterThanOrEqualTo(1);
+
+ // Verify descriptor files backed by external storage exist in the
configured path
+ java.nio.file.Path externalStoragePath =
+ tempPath.resolve("external-storage-blob-path-3type");
+ assertThat(Files.exists(externalStoragePath)).isTrue();
+ try (Stream<java.nio.file.Path> stream =
Files.list(externalStoragePath)) {
+ long externalStorageFiles =
+ stream.filter(p ->
p.getFileName().toString().endsWith(".blob")).count();
+ assertThat(externalStorageFiles).isGreaterThanOrEqualTo(1);
+ }
+ }
+
+ @Test
+ public void testExternalStorageFieldValidationRequiresPath() {
+ assertThatThrownBy(
+ () -> {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+
schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f2");
+ schemaBuilder.option(
+
CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(), "f2");
+ // No external storage path set
+ catalog.createTable(identifier(),
schemaBuilder.build(), true);
+ })
+ .hasRootCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "'"
+ + CoreOptions.BLOB_EXTERNAL_STORAGE_PATH.key()
+ + "' must be set when '"
+ + CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()
+ + "' is configured.");
+ }
+
+ @Test
+ public void testExternalStorageFieldMustBeSubsetOfDescriptorField() {
+ assertThatThrownBy(
+ () -> {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ // f2 is configured for external storage but not in
+ // blob-descriptor-field
+ schemaBuilder.option(
+
CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(), "f2");
+ schemaBuilder.option(
+
CoreOptions.BLOB_EXTERNAL_STORAGE_PATH.key(), "/tmp/target");
+ catalog.createTable(identifier(),
schemaBuilder.build(), true);
+ })
+ .hasRootCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "Field 'f2' in '"
+ + CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()
+ + "' must also be in '"
+ + CoreOptions.BLOB_DESCRIPTOR_FIELD.key()
+ + "'.");
+ }
+
+ private void createExternalStorageTable() throws Exception {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f2");
+ schemaBuilder.option(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(),
"f2");
+ schemaBuilder.option(
+ CoreOptions.BLOB_EXTERNAL_STORAGE_PATH.key(),
+ tempPath.resolve("external-storage-blob-path").toString());
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+ }
+
+ private void createThreeTypeBlobTable() throws Exception {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB()); // raw-data blob
+ schemaBuilder.column("f3", DataTypes.BLOB()); // descriptor blob
+ schemaBuilder.column("f4", DataTypes.BLOB()); // descriptor blob with
external storage
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f3,f4");
+ schemaBuilder.option(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(),
"f4");
+ schemaBuilder.option(
+ CoreOptions.BLOB_EXTERNAL_STORAGE_PATH.key(),
+
tempPath.resolve("external-storage-blob-path-3type").toString());
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+ }
+
private void createDescriptorTable() throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
index 3ada89c777..2b37f7dc72 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
@@ -108,7 +108,9 @@ public class RollingBlobFileWriterTest {
false, // asyncFileWrite
false, // statsDenseStore
null,
- Collections.emptySet());
+ Collections.emptySet(),
+ Collections.emptySet(),
+ null);
}
@Test
@@ -209,7 +211,9 @@ public class RollingBlobFileWriterTest {
false, // asyncFileWrite
false, // statsDenseStore
null,
- Collections.emptySet());
+ Collections.emptySet(),
+ Collections.emptySet(),
+ null);
// Create large blob data that will exceed the blob target file size
byte[] largeBlobData = new byte[3 * 1024 * 1024]; // 3 MB blob data
@@ -287,7 +291,9 @@ public class RollingBlobFileWriterTest {
false, // asyncFileWrite
false, // statsDenseStore
null,
- Collections.emptySet());
+ Collections.emptySet(),
+ Collections.emptySet(),
+ null);
// Create blob data that will trigger rolling
byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data
@@ -367,7 +373,9 @@ public class RollingBlobFileWriterTest {
false, // asyncFileWrite
false, // statsDenseStore
null,
- Collections.emptySet());
+ Collections.emptySet(),
+ Collections.emptySet(),
+ null);
// Create blob data that will trigger rolling
byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data
@@ -585,7 +593,9 @@ public class RollingBlobFileWriterTest {
false, // asyncFileWrite
false, // statsDenseStore
null,
- Collections.emptySet());
+ Collections.emptySet(),
+ Collections.emptySet(),
+ null);
// Write data
for (int i = 0; i < 3; i++) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 7b74bfd7bb..5dc5635bea 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -293,6 +293,8 @@ public class KeyValueFileReadWriteTest {
false,
null,
options.blobDescriptorField(),
+ options.blobExternalStorageField(),
+ options.blobExternalStoragePath(),
options.dataEvolutionEnabled());
appendOnlyWriter.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
index bec57cd7f4..8b0901ca88 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
@@ -37,6 +37,7 @@ import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeCasts;
+import org.apache.paimon.types.DataTypeFamily;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
@@ -68,6 +69,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -416,6 +418,9 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
.collect(Collectors.toMap(DataField::name,
Function.identity()));
List<String> partitionKeys = ((FileStoreTable)
table).schema().partitionKeys();
+ // Get updatable BLOB fields (descriptor-based fields that support
partial updates)
+ Set<String> updatableBlobFields = coreOptions.updatableBlobFields();
+
List<Column> flinkColumns = source.getResolvedSchema().getColumns();
boolean foundRowIdColumn = false;
for (Column flinkColumn : flinkColumns) {
@@ -434,15 +439,33 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
throw new IllegalStateException(
"Column not found in target table: " +
flinkColumn.getName());
}
- if (targetField.type().getTypeRoot() == DataTypeRoot.BLOB) {
+ if (targetField.type().getTypeRoot() == DataTypeRoot.BLOB
+ &&
!updatableBlobFields.contains(flinkColumn.getName())) {
throw new IllegalStateException(
- "Should not append/update new BLOB column through
MERGE INTO.");
+ "Should not append/update raw-data BLOB column '"
+ + flinkColumn.getName()
+ + "' through MERGE INTO. "
+ + "Only descriptor-based BLOB columns
(configured via '"
+ + CoreOptions.BLOB_DESCRIPTOR_FIELD.key()
+ + "' or '"
+ +
CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()
+ + "') can be updated.");
}
DataType paimonType =
LogicalTypeConversion.toDataType(
flinkColumn.getDataType().getLogicalType());
- if (!DataTypeCasts.supportsCompatibleCast(paimonType,
targetField.type())) {
+ // For descriptor-based BLOB fields, allow BYTES
(VARBINARY/BINARY) source
+ // since BLOB is represented as BYTES in Flink SQL
+ boolean blobCompatible =
+ targetField.type().getTypeRoot() == DataTypeRoot.BLOB
+ &&
updatableBlobFields.contains(flinkColumn.getName())
+ && paimonType
+ .getTypeRoot()
+ .getFamilies()
+
.contains(DataTypeFamily.BINARY_STRING);
+ if (!blobCompatible
+ && !DataTypeCasts.supportsCompatibleCast(paimonType,
targetField.type())) {
throw new IllegalStateException(
String.format(
"DataType incompatible of field %s: %s is
not compatible with %s",
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index 1d192cb437..24b411ed7d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -32,10 +32,12 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.OutputStream;
import java.net.URI;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
+import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
@@ -47,10 +49,24 @@ public class BlobTableITCase extends CatalogITCaseBase {
@Override
protected List<String> ddl() {
+ String externalStoragePath =
warehouse.resolve("external-storage-blob-path").toString();
return Arrays.asList(
"CREATE TABLE IF NOT EXISTS blob_table (id INT, data STRING,
picture BYTES) WITH ('row-tracking.enabled'='true',
'data-evolution.enabled'='true', 'blob-field'='picture')",
"CREATE TABLE IF NOT EXISTS blob_table_descriptor (id INT,
data STRING, picture BYTES) WITH ('row-tracking.enabled'='true',
'data-evolution.enabled'='true', 'blob-field'='picture',
'blob-as-descriptor'='true')",
- "CREATE TABLE IF NOT EXISTS multiple_blob_table (id INT, data
STRING, pic1 BYTES, pic2 BYTES) WITH ('row-tracking.enabled'='true',
'data-evolution.enabled'='true', 'blob-field'='pic1,pic2')");
+ "CREATE TABLE IF NOT EXISTS multiple_blob_table (id INT, data
STRING, pic1 BYTES, pic2 BYTES) WITH ('row-tracking.enabled'='true',
'data-evolution.enabled'='true', 'blob-field'='pic1,pic2')",
+ String.format(
+ "CREATE TABLE IF NOT EXISTS copy_blob_table (id INT,
data STRING, picture BYTES)"
+ + " WITH ('row-tracking.enabled'='true',
'data-evolution.enabled'='true',"
+ + " 'blob-field'='picture',
'blob-descriptor-field'='picture',"
+ + " 'blob-external-storage-field'='picture',
'blob-external-storage-path'='%s')",
+ externalStoragePath),
+ String.format(
+ "CREATE TABLE IF NOT EXISTS three_type_blob_table"
+ + " (id INT, data STRING, raw_pic BYTES,
desc_pic BYTES, copy_pic BYTES)"
+ + " WITH ('row-tracking.enabled'='true',
'data-evolution.enabled'='true',"
+ + " 'blob-field'='raw_pic,desc_pic,copy_pic',
'blob-descriptor-field'='desc_pic,copy_pic',"
+ + " 'blob-external-storage-field'='copy_pic',
'blob-external-storage-path'='%s')",
+ externalStoragePath));
}
@Test
@@ -152,6 +168,89 @@ public class BlobTableITCase extends CatalogITCaseBase {
.containsExactlyInAnyOrder(Row.of(1, "paimon", blobData));
}
+ @Test
+ public void testExternalStorageBlob() throws Exception {
+ // Write raw data; descriptor mode with external storage should write
to the external path.
+ batchSql("INSERT INTO copy_blob_table VALUES (1, 'copy-test',
X'48656C6C6F')");
+
+ // Read back — the blob should be readable
+ List<Row> result = batchSql("SELECT * FROM copy_blob_table");
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(1, "copy-test", new byte[] {72, 101, 108, 108,
111}));
+
+ // Verify blob files exist in the external storage path
+ Path externalStorageDir =
warehouse.resolve("external-storage-blob-path");
+ assertThat(Files.exists(externalStorageDir)).isTrue();
+ try (Stream<Path> stream = Files.list(externalStorageDir)) {
+ long externalStorageFiles =
+ stream.filter(p ->
p.getFileName().toString().endsWith(".blob")).count();
+ assertThat(externalStorageFiles).isGreaterThanOrEqualTo(1);
+ }
+ }
+
+ @Test
+ public void testThreeTypeBlobCoexistence() throws Exception {
+ // Prepare external blob for the descriptor field
+ byte[] descBlobData = new byte[256];
+ RANDOM.nextBytes(descBlobData);
+ FileIO fileIO = new LocalFileIO();
+ String uri = "file://" + warehouse + "/external_desc_blob";
+ try (OutputStream outputStream =
+ fileIO.newOutputStream(new org.apache.paimon.fs.Path(uri),
true)) {
+ outputStream.write(descBlobData);
+ }
+
+ BlobDescriptor descriptor = new BlobDescriptor(uri, 0,
descBlobData.length);
+
+ // raw_pic = raw bytes, desc_pic = descriptor, copy_pic = descriptor
with external storage
+ batchSql(
+ String.format(
+ "INSERT INTO three_type_blob_table VALUES"
+ + " (1, 'three-types', X'48656C6C6F', X'%s',
X'5945')",
+ bytesToHex(descriptor.serialize())));
+
+ // Read back and verify all three blob types
+ List<Row> result =
+ batchSql("SELECT id, data, raw_pic, copy_pic FROM
three_type_blob_table");
+ assertThat(result.size()).isEqualTo(1);
+ Row row = result.get(0);
+ assertThat(row.getField(0)).isEqualTo(1);
+ assertThat(row.getField(1)).isEqualTo("three-types");
+ assertThat((byte[]) row.getField(2)).isEqualTo(new byte[] {72, 101,
108, 108, 111});
+ assertThat((byte[]) row.getField(3)).isEqualTo(new byte[] {89, 69});
+
+ // Verify descriptor files backed by external storage exist in the
configured path
+ Path externalStorageDir =
warehouse.resolve("external-storage-blob-path");
+ assertThat(Files.exists(externalStorageDir)).isTrue();
+ try (Stream<Path> stream = Files.list(externalStorageDir)) {
+ long externalStorageFiles =
+ stream.filter(p ->
p.getFileName().toString().endsWith(".blob")).count();
+ assertThat(externalStorageFiles).isGreaterThanOrEqualTo(1);
+ }
+ }
+
+ @Test
+ public void testExternalStorageBlobMultipleWrites() throws Exception {
+ // Write two batches; each should create separate descriptor files in
external storage.
+ batchSql("INSERT INTO copy_blob_table VALUES (1, 'row1',
X'48656C6C6F')");
+ batchSql("INSERT INTO copy_blob_table VALUES (2, 'row2', X'5945')");
+
+ // Read back
+ List<Row> result = batchSql("SELECT * FROM copy_blob_table ORDER BY
id");
+ assertThat(result.size()).isEqualTo(2);
+ assertThat(result.get(0)).isEqualTo(Row.of(1, "row1", new byte[] {72,
101, 108, 108, 111}));
+ assertThat(result.get(1)).isEqualTo(Row.of(2, "row2", new byte[] {89,
69}));
+
+ // Verify multiple blob files exist in the external storage path
+ Path externalStorageDir =
warehouse.resolve("external-storage-blob-path");
+ try (Stream<Path> stream = Files.list(externalStorageDir)) {
+ long externalStorageFiles =
+ stream.filter(p ->
p.getFileName().toString().endsWith(".blob")).count();
+ assertThat(externalStorageFiles).isGreaterThanOrEqualTo(2);
+ }
+ }
+
private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
public static String bytesToHex(byte[] bytes) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
index 0aa0941270..18fa729787 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
@@ -18,6 +18,9 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BlobDescriptor;
+
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -441,6 +444,173 @@ public class DataEvolutionMergeIntoActionITCase extends
ActionITCaseBase {
action.rewriteMergeCondition(mergeCondition));
}
+ @Test
+ public void testUpdateRawBlobColumnThrowsError() throws Exception {
+ // Create a table with raw-data BLOB column
+ sEnv.executeSql(
+ buildDdl(
+ "RAW_BLOB_T",
+ Arrays.asList("id INT", "name STRING", "picture
BYTES"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<String, String>() {
+ {
+ put(ROW_TRACKING_ENABLED.key(), "true");
+ put(DATA_EVOLUTION_ENABLED.key(), "true");
+ put("blob-field", "picture");
+ }
+ }));
+ insertInto("RAW_BLOB_T", "(1, 'name1', X'48656C6C6F')");
+
+ // Create source table
+ sEnv.executeSql(
+ buildDdl(
+ "RAW_BLOB_S",
+ Arrays.asList("id INT", "picture BYTES"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<String, String>() {
+ {
+ put(ROW_TRACKING_ENABLED.key(), "true");
+ put(DATA_EVOLUTION_ENABLED.key(), "true");
+ put("blob-field", "picture");
+ }
+ }));
+ insertInto("RAW_BLOB_S", "(1, X'4E4557')");
+
+ DataEvolutionMergeIntoAction action =
+ builder(warehouse, database, "RAW_BLOB_T")
+ .withMergeCondition("RAW_BLOB_T.id=RAW_BLOB_S.id")
+
.withMatchedUpdateSet("RAW_BLOB_T.picture=RAW_BLOB_S.picture")
+ .withSourceTable("RAW_BLOB_S")
+ .withSinkParallelism(1)
+ .build();
+
+ Throwable t = Assertions.assertThrows(IllegalStateException.class, ()
-> action.run());
+ Assertions.assertTrue(
+ t.getMessage().contains("raw-data BLOB column"),
+ "Expected error about raw-data BLOB column but got: " +
t.getMessage());
+ }
+
+ @Test
+ public void testUpdateNonBlobColumnOnDescriptorBlobTableSucceeds() throws
Exception {
+ // Create a table with descriptor BLOB column.
+ // Previously, MERGE INTO would reject ANY table with BLOB columns.
+ // With our change, tables with descriptor-based BLOB columns are
accepted
+ // as long as the BLOB column itself is descriptor-based.
+ sEnv.executeSql(
+ buildDdl(
+ "DESC_BLOB_T",
+ Arrays.asList("id INT", "name STRING", "picture
BYTES"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<String, String>() {
+ {
+ put(ROW_TRACKING_ENABLED.key(), "true");
+ put(DATA_EVOLUTION_ENABLED.key(), "true");
+ put("blob-field", "picture");
+ put("blob-descriptor-field", "picture");
+ }
+ }));
+
+ // Insert data with descriptor-based BLOB
+ BlobDescriptor desc1 = new BlobDescriptor("file:///dummy/blob1", 0, 2);
+ BlobDescriptor desc2 = new BlobDescriptor("file:///dummy/blob2", 0, 2);
+ insertInto(
+ "DESC_BLOB_T", String.format("(1, 'name1', X'%s')",
bytesToHex(desc1.serialize())));
+ insertInto(
+ "DESC_BLOB_T", String.format("(2, 'name2', X'%s')",
bytesToHex(desc2.serialize())));
+
+ // Create source table for the update (only non-BLOB columns)
+ sEnv.executeSql(
+ buildDdl(
+ "DESC_BLOB_S",
+ Arrays.asList("id INT", "name STRING"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<String, String>() {
+ {
+ put(ROW_TRACKING_ENABLED.key(), "true");
+ put(DATA_EVOLUTION_ENABLED.key(), "true");
+ }
+ }));
+ insertInto("DESC_BLOB_S", "(1, 'updated_name1')");
+
+ // Update only the 'name' column via MERGE INTO.
+ // This should succeed — the table has BLOB columns but they are
descriptor-based.
+ builder(warehouse, database, "DESC_BLOB_T")
+ .withMergeCondition("DESC_BLOB_T.id=DESC_BLOB_S.id")
+ .withMatchedUpdateSet("DESC_BLOB_T.name=DESC_BLOB_S.name")
+ .withSourceTable("DESC_BLOB_S")
+ .withSinkParallelism(1)
+ .build()
+ .run();
+
+ // Verify: id=1 name updated, id=2 unchanged
+ List<Row> expected =
+ Arrays.asList(
+ changelogRow("+I", 1, "updated_name1"),
changelogRow("+I", 2, "name2"));
+ testBatchRead("SELECT id, name FROM DESC_BLOB_T ORDER BY id",
expected);
+ }
+
+ @Test
+ public void testUpdateExternalStorageBlobColumnSucceeds() throws Exception
{
+ // Create a temp path for descriptor blobs backed by external storage.
+ String externalStoragePath = getTempDirPath("external-storage-path");
+
+ // Create a table with a descriptor BLOB column backed by external
storage.
+ sEnv.executeSql(
+ buildDdl(
+ "COPY_UPD_T",
+ Arrays.asList("id INT", "name STRING", "picture
BYTES"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<String, String>() {
+ {
+ put(ROW_TRACKING_ENABLED.key(), "true");
+ put(DATA_EVOLUTION_ENABLED.key(), "true");
+ put("blob-field", "picture");
+ put("blob-descriptor-field", "picture");
+
put(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(), "picture");
+ put(
+
CoreOptions.BLOB_EXTERNAL_STORAGE_PATH.key(),
+ externalStoragePath);
+ }
+ }));
+
+ // Insert initial row with raw bytes; write path stores data in
external storage and keeps
+ // descriptor bytes.
+ insertInto("COPY_UPD_T", "(1, 'name1', X'48656C6C6F')");
+
+ // Create source table with new raw bytes for the BLOB column
+ sEnv.executeSql(
+ buildDdl(
+ "COPY_UPD_S",
+ Arrays.asList("id INT", "picture BYTES"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<String, String>() {
+ {
+ put(ROW_TRACKING_ENABLED.key(), "true");
+ put(DATA_EVOLUTION_ENABLED.key(), "true");
+ }
+ }));
+ insertInto("COPY_UPD_S", "(1, X'574F524C44')");
+
+ // Update this descriptor BLOB column backed by external storage via
MERGE INTO.
+ builder(warehouse, database, "COPY_UPD_T")
+ .withMergeCondition("COPY_UPD_T.id=COPY_UPD_S.id")
+ .withMatchedUpdateSet("COPY_UPD_T.picture=COPY_UPD_S.picture")
+ .withSourceTable("COPY_UPD_S")
+ .withSinkParallelism(1)
+ .build()
+ .run();
+
+ // Verify: name stays the same, BLOB column should have new data
+ List<Row> expected = Arrays.asList(changelogRow("+I", 1, "name1"));
+ testBatchRead("SELECT id, name FROM COPY_UPD_T ORDER BY id", expected);
+ }
+
private void prepareTargetTable() throws Exception {
sEnv.executeSql(
buildDdl(
@@ -571,4 +741,16 @@ public class DataEvolutionMergeIntoActionITCase extends
ActionITCaseBase {
return createAction(DataEvolutionMergeIntoAction.class, args);
}
}
+
+ private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
+
+ private static String bytesToHex(byte[] bytes) {
+ char[] hexChars = new char[bytes.length * 2];
+ for (int j = 0; j < bytes.length; j++) {
+ int v = bytes[j] & 0xFF;
+ hexChars[j * 2] = HEX_ARRAY[v >>> 4];
+ hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
+ }
+ return new String(hexChars);
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
index a5d9278523..d5b8725e5d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
@@ -61,9 +61,16 @@ case class DataEvolutionPaimonWriter(paimonTable:
FileStoreTable, dataSplits: Se
assert(data.columns.length == columnNames.size + 2)
val writeType = table.rowType().project(columnNames.asJava)
- if (writeType.getFieldTypes.stream.anyMatch((t: DataType) => t.is(BLOB))) {
+ val options = new CoreOptions(table.schema().options())
+ val updatableBlobFields = options.updatableBlobFields()
+ val hasRawDataBlob = writeType.getFields.asScala.exists(
+ f => f.`type`().is(BLOB) && !updatableBlobFields.contains(f.name()))
+ if (hasRawDataBlob) {
throw new UnsupportedOperationException(
- "DataEvolution does not support writing partial columns mixed with
BLOB type.")
+ "DataEvolution does not support writing partial columns with raw-data
BLOB type. " +
+ "Only descriptor-based BLOB columns (configured via '" +
+ CoreOptions.BLOB_DESCRIPTOR_FIELD.key() + "' or '" +
+ CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key() + "') can be updated.")
}
val written =
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index 1c108a9abf..86e0a0dce2 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -224,6 +224,96 @@ class BlobTestBase extends PaimonSparkTestBase {
}
}
+ test("Blob: merge-into rejects updating raw-data BLOB column") {
+ withTable("s", "t") {
+ sql("CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES
" +
+ "('row-tracking.enabled'='true', 'data-evolution.enabled'='true',
'blob-field'='picture')")
+ sql("INSERT INTO t VALUES (1, 'name1', X'48656C6C6F')")
+
+ sql("CREATE TABLE s (id INT, picture BINARY)")
+ sql("INSERT INTO s VALUES (1, X'4E4557')")
+
+ val e = intercept[UnsupportedOperationException] {
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN MATCHED THEN UPDATE SET t.picture = s.picture
+ |""".stripMargin)
+ }
+ assert(e.getMessage.contains("raw-data BLOB"))
+ }
+ }
+
+ test("Blob: merge-into updates non-blob column on descriptor blob table") {
+ withTable("s", "t") {
+ sql(
+ "CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES " +
+ "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', " +
+ "'blob-field'='picture', 'blob-descriptor-field'='picture')")
+
+ // Insert with a descriptor pointing to a real file
+ val blobData = new Array[Byte](256)
+ RANDOM.nextBytes(blobData)
+ val fileIO = new LocalFileIO()
+ val uri = "file://" + tempDBDir.getCanonicalPath + "/external_desc_blob"
+ val out = fileIO.newOutputStream(new Path(uri), true)
+ try { out.write(blobData) }
+ finally { out.close() }
+ val desc = new BlobDescriptor(uri, 0, blobData.length)
+ sql(s"INSERT INTO t VALUES (1, 'name1',
X'${bytesToHex(desc.serialize())}')")
+ sql(s"INSERT INTO t VALUES (2, 'name2',
X'${bytesToHex(desc.serialize())}')")
+
+ sql("CREATE TABLE s (id INT, name STRING)")
+ sql("INSERT INTO s VALUES (1, 'updated_name1')")
+
+ // Update only the 'name' column — should succeed for descriptor-based
blob table
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN MATCHED THEN UPDATE SET t.name = s.name
+ |""".stripMargin)
+
+ checkAnswer(
+ sql("SELECT id, name FROM t ORDER BY id"),
+ Seq(Row(1, "updated_name1"), Row(2, "name2"))
+ )
+ }
+ }
+
+ test("Blob: merge-into updates descriptor blob column with external storage
end-to-end") {
+ withTable("s", "t") {
+ val externalStoragePath = tempDBDir.getCanonicalPath +
"/external-storage-blob-merge-path"
+ sql(
+ s"CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES "
+
+ s"('row-tracking.enabled'='true', 'data-evolution.enabled'='true', "
+
+ s"'blob-field'='picture', 'blob-descriptor-field'='picture', " +
+ s"'blob-external-storage-field'='picture', " +
+ s"'blob-external-storage-path'='$externalStoragePath')")
+
+ // Insert initial row (writes raw data to external storage and stores
descriptor bytes)
+ sql("INSERT INTO t VALUES (1, 'name1', X'48656C6C6F')")
+ sql("INSERT INTO t VALUES (2, 'name2', X'5945')")
+
+ sql("CREATE TABLE s (id INT, name STRING)")
+ sql("INSERT INTO s VALUES (1, 'updated_name1')")
+
+ // Update the 'name' column via MERGE INTO
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN MATCHED THEN UPDATE SET t.name = s.name
+ |""".stripMargin)
+
+ checkAnswer(
+ sql("SELECT id, name FROM t ORDER BY id"),
+ Seq(Row(1, "updated_name1"), Row(2, "name2"))
+ )
+ }
+ }
+
private val HEX_ARRAY = "0123456789ABCDEF".toCharArray
def bytesToHex(bytes: Array[Byte]): String = {