This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 46b120da29 [core] support adding blob column through comments (#7996)
46b120da29 is described below

commit 46b120da2918ea51d0f4c51768fed1184f4356d2
Author: Faiz <[email protected]>
AuthorDate: Fri May 29 13:25:49 2026 +0800

    [core] support adding blob column through comments (#7996)
---
 docs/docs/append-table/blob.mdx                    |  50 +++++
 .../main/java/org/apache/paimon/CoreOptions.java   |   1 +
 .../org/apache/paimon/schema/BlobSchemaUtils.java  | 191 ++++++++++++++++++
 .../org/apache/paimon/schema/SchemaManager.java    |  71 ++++++-
 .../paimon/append/MultipleBlobTableTest.java       |   3 +-
 .../apache/paimon/schema/BlobSchemaUtilsTest.java  | 134 +++++++++++++
 .../apache/paimon/table/SchemaEvolutionTest.java   | 219 +++++++++++++++++++++
 .../apache/paimon/flink/SchemaChangeITCase.java    |  21 ++
 .../paimon/spark/SparkSchemaEvolutionITCase.java   |  34 ++++
 9 files changed, 720 insertions(+), 4 deletions(-)

diff --git a/docs/docs/append-table/blob.mdx b/docs/docs/append-table/blob.mdx
index dfe78709b2..a3d4eedca5 100644
--- a/docs/docs/append-table/blob.mdx
+++ b/docs/docs/append-table/blob.mdx
@@ -268,6 +268,56 @@ CREATE TABLE image_table (
 
 </Tabs>
 
+### Adding a Blob Column to an Existing Table
+
+A BLOB column can be added to an existing blob-enabled table with a single 
`ALTER TABLE ADD COLUMN` statement. Because most SQL engines do not have a 
`BLOB` syntax, the new column is declared as `BYTES` or `BINARY` and the BLOB 
storage mode is selected via a **comment directive**:
+
+- `__BLOB_FIELD` — store the column as a default blob (raw bytes written to 
`.blob` files, equivalent to `blob-field`).
+- `__BLOB_DESCRIPTOR_FIELD` — store the column as a descriptor-only blob 
inline in data files (equivalent to `blob-descriptor-field`).
+
+Anything after the optional `;` separator is preserved as the column's real 
comment.
+
+<Tabs groupId="blob-add-column">
+
+<TabItem value="flink-sql" label="Flink SQL">
+
+```sql
+-- Add a blob-field column (no extra user comment)
+ALTER TABLE image_table ADD picture BYTES COMMENT '__BLOB_FIELD';
+
+-- Add a descriptor-field column with a real user comment
+ALTER TABLE image_table
+    ADD video BYTES COMMENT '__BLOB_DESCRIPTOR_FIELD; promotional video';
+```
+
+</TabItem>
+
+<TabItem value="spark-sql" label="Spark SQL">
+
+```sql
+-- Add a blob-field column (no extra user comment)
+ALTER TABLE image_table ADD COLUMN picture BINARY COMMENT '__BLOB_FIELD';
+
+-- Add a descriptor-field column with a real user comment
+ALTER TABLE image_table
+    ADD COLUMN video BINARY COMMENT '__BLOB_DESCRIPTOR_FIELD; promotional 
video';
+```
+
+</TabItem>
+
+</Tabs>
+
+Paimon converts the declared `BYTES`/`BINARY` type to `BLOB`, appends the new 
column to the corresponding option (`blob-field` or `blob-descriptor-field`), 
and stores the trimmed real comment on the column. The whole operation is 
atomic — no need to `SET` an option first and then `ADD COLUMN`.
+
+#### Limitations
+
+1. **Storage mode must be explicit.** Only `__BLOB_FIELD` and 
`__BLOB_DESCRIPTOR_FIELD` are accepted. `blob-view-field` and 
`blob-external-storage-field` cannot be added this way; they must be configured 
at table creation time.
+2. **An unknown `__BLOB`-prefixed directive is rejected** so typos do not 
silently fall through as a regular comment.
+3. **Column type must be `BYTES` / `BINARY` (or `BLOB` when calling the Java 
API directly).** Other types with a BLOB directive are rejected.
+4. **Raw BLOB without directive is rejected.** When calling the Java SDK and 
passing `DataTypes.BLOB()` to `SchemaChange.addColumn`, the directive is still 
required so the storage mode (default vs descriptor) is unambiguous.
+5. **Existing columns cannot be converted to/from BLOB.** `ALTER TABLE ... 
CHANGE`/`ALTER COLUMN TYPE` between BLOB and any other type is rejected — both 
directions break already-written data.
+6. **Dropping a BLOB column cleans the options automatically.** When you 
`ALTER TABLE ... DROP COLUMN`, Paimon removes the dropped name from 
`blob-field` / `blob-descriptor-field` / `blob-view-field` / 
`blob-external-storage-field` so the remaining options stay consistent with the 
schema.
+
 ### Inserting Blob Data
 
 <Tabs groupId="blob-insert">
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 6b0ef75ff8..dc5755e1ba 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2308,6 +2308,7 @@ public class CoreOptions implements Serializable {
                     .noDefaultValue()
                     .withDescription("Format table commit hive sync uri.");
 
+    @Immutable
     public static final ConfigOption<String> BLOB_FIELD =
             key("blob-field")
                     .stringType()
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/BlobSchemaUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/BlobSchemaUtils.java
new file mode 100644
index 0000000000..18e388ecca
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/BlobSchemaUtils.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.schema;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.FallbackKey;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/** Utilities for BLOB-related schema evolution (ALTER TABLE ADD COLUMN 
comment directives). */
+public final class BlobSchemaUtils {
+
+    public static final String BLOB_FIELD_DIRECTIVE = "__BLOB_FIELD";
+    public static final String BLOB_DESCRIPTOR_FIELD_DIRECTIVE = 
"__BLOB_DESCRIPTOR_FIELD";
+
+    private BlobSchemaUtils() {}
+
+    /**
+     * Parses the comment of an {@code ALTER TABLE ADD COLUMN} statement. 
Returns {@code null} when
+     * the comment is a regular user comment; returns a {@link 
ParsedDirective} when the comment
+     * begins with a supported BLOB directive. Throws {@link 
IllegalArgumentException} when the
+     * comment begins with {@code __BLOB} but is not one of the supported 
directives.
+     */
+    @Nullable
+    public static ParsedDirective parseAddColumnComment(@Nullable String 
comment) {
+        if (comment == null || !comment.startsWith("__BLOB")) {
+            return null;
+        }
+        comment = StringUtils.trim(comment);
+        String optionKey = matchDirective(comment, 
BLOB_DESCRIPTOR_FIELD_DIRECTIVE);
+        String marker = BLOB_DESCRIPTOR_FIELD_DIRECTIVE;
+        if (optionKey == null) {
+            optionKey = matchDirective(comment, BLOB_FIELD_DIRECTIVE);
+            marker = BLOB_FIELD_DIRECTIVE;
+        }
+        Preconditions.checkArgument(
+                optionKey != null,
+                "Unsupported BLOB directive in column comment: '%s'. Supported 
directives are "
+                        + "'%s' and '%s'.",
+                comment,
+                BLOB_FIELD_DIRECTIVE,
+                BLOB_DESCRIPTOR_FIELD_DIRECTIVE);
+        String realComment =
+                comment.length() == marker.length()
+                        ? null
+                        : comment.substring(marker.length() + 1).trim();
+        if (realComment != null && realComment.isEmpty()) {
+            realComment = null;
+        }
+        return new ParsedDirective(optionKey, realComment);
+    }
+
+    @Nullable
+    private static String matchDirective(String comment, String marker) {
+        if (!comment.startsWith(marker)) {
+            return null;
+        }
+        if (comment.length() == marker.length()) {
+            return optionKeyFor(marker);
+        }
+        return comment.charAt(marker.length()) == ';' ? optionKeyFor(marker) : 
null;
+    }
+
+    private static String optionKeyFor(String marker) {
+        if (BLOB_FIELD_DIRECTIVE.equals(marker)) {
+            return CoreOptions.BLOB_FIELD.key();
+        } else if (BLOB_DESCRIPTOR_FIELD_DIRECTIVE.equals(marker)) {
+            return CoreOptions.BLOB_DESCRIPTOR_FIELD.key();
+        } else {
+            throw new IllegalArgumentException("Unsupported BLOB directive: " 
+ marker);
+        }
+    }
+
+    /**
+     * Modify blob options, ensure the `blob-field`, `blob-descriptor-field` 
is consistent with
+     * actual schema. If the canonical key is empty but a fallback key holds 
the value (e.g. legacy
+     * {@code blob.stored-descriptor-fields}), the fallback value is migrated 
to the canonical key
+     * before appending so old entries are not shadowed.
+     */
+    public static void modifyBlobOptions(
+            String blobKey, String fieldName, Map<String, String> options) {
+        ConfigOption<String> option;
+        if (CoreOptions.BLOB_FIELD.key().equals(blobKey)) {
+            option = CoreOptions.BLOB_FIELD;
+        } else if (CoreOptions.BLOB_DESCRIPTOR_FIELD.key().equals(blobKey)) {
+            option = CoreOptions.BLOB_DESCRIPTOR_FIELD;
+        } else {
+            throw new IllegalArgumentException("Unsupported BLOB directive: " 
+ blobKey);
+        }
+
+        String existing = options.get(blobKey);
+        if (existing == null || existing.isEmpty()) {
+            // migrate legacy fallback keys to current canonical key
+            for (FallbackKey fk : option.fallbackKeys()) {
+                String fallbackValue = options.remove(fk.getKey());
+                if (fallbackValue != null && !fallbackValue.isEmpty()) {
+                    existing = fallbackValue;
+                    break;
+                }
+            }
+        }
+        String newValue = existing == null ? fieldName : existing + "," + 
fieldName;
+        options.put(blobKey, newValue);
+    }
+
+    /**
+     * Removes {@code fieldName} from every BLOB-related comma-separated 
option (and the legacy
+     * fallback key for {@code blob-descriptor-field}). When the resulting csv 
becomes empty the
+     * option key is dropped entirely. Used when a BLOB column is being 
dropped.
+     */
+    public static void removeFromBlobOptions(String fieldName, Map<String, 
String> options) {
+        ConfigOption<String>[] keys =
+                new ConfigOption[] {
+                    CoreOptions.BLOB_FIELD,
+                    CoreOptions.BLOB_DESCRIPTOR_FIELD,
+                    CoreOptions.BLOB_VIEW_FIELD,
+                    CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD
+                };
+        for (ConfigOption<String> option : keys) {
+            removeFromCsvOption(option.key(), fieldName, options);
+            for (FallbackKey fk : option.fallbackKeys()) {
+                removeFromCsvOption(fk.getKey(), fieldName, options);
+            }
+        }
+    }
+
+    private static void removeFromCsvOption(
+            String key, String fieldName, Map<String, String> options) {
+        String existing = options.get(key);
+        if (existing == null || existing.isEmpty()) {
+            return;
+        }
+        StringBuilder sb = new StringBuilder();
+        for (String v : existing.split(",")) {
+            String trimmed = v.trim();
+            if (trimmed.isEmpty() || trimmed.equals(fieldName)) {
+                continue;
+            }
+            if (sb.length() > 0) {
+                sb.append(',');
+            }
+            sb.append(trimmed);
+        }
+        if (sb.length() == 0) {
+            options.remove(key);
+        } else {
+            options.put(key, sb.toString());
+        }
+    }
+
+    /** Parsed BLOB directive: the option key to update and the user-facing 
comment. */
+    public static final class ParsedDirective {
+        private final String optionKey;
+        @Nullable private final String realComment;
+
+        private ParsedDirective(String optionKey, @Nullable String 
realComment) {
+            this.optionKey = optionKey;
+            this.realComment = realComment;
+        }
+
+        public String optionKey() {
+            return optionKey;
+        }
+
+        @Nullable
+        public String realComment() {
+            return realComment;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 670960889d..09ddeb14b2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -39,6 +39,7 @@ import org.apache.paimon.schema.SchemaChange.UpdateComment;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.SchemaModification;
 import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BlobType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeCasts;
@@ -342,8 +343,46 @@ public class SchemaManager implements Serializable {
                         "Column %s cannot specify NOT NULL in the %s table.",
                         String.join(".", addColumn.fieldNames()),
                         lazyIdentifier.get().getFullName());
+
+                BlobSchemaUtils.ParsedDirective blobDirective =
+                        
BlobSchemaUtils.parseAddColumnComment(addColumn.description());
+                DataType requestedDataType = addColumn.dataType();
+                String effectiveComment = addColumn.description();
+                // try convert to blob type
+                if (blobDirective != null) {
+                    Preconditions.checkArgument(
+                            addColumn.fieldNames().length == 1,
+                            "BLOB directive cannot be used on a nested column 
%s.",
+                            String.join(".", addColumn.fieldNames()));
+                    DataTypeRoot root = requestedDataType.getTypeRoot();
+                    Preconditions.checkArgument(
+                            root == DataTypeRoot.VARBINARY
+                                    || root == DataTypeRoot.BINARY
+                                    || root == DataTypeRoot.BLOB,
+                            "Column %s declared with a BLOB directive must be 
of BYTES, "
+                                    + "BINARY or BLOB type, but was %s.",
+                            addColumn.fieldNames()[0],
+                            requestedDataType);
+                    requestedDataType = new 
BlobType(requestedDataType.isNullable());
+                    effectiveComment = blobDirective.realComment();
+
+                    BlobSchemaUtils.modifyBlobOptions(
+                            blobDirective.optionKey(), 
addColumn.fieldNames()[0], newOptions);
+                } else if (requestedDataType.is(DataTypeRoot.BLOB)) {
+                    // We do not permit adding blob type column without 
comment hint,
+                    // since we don't know the storage mode i.e. native blob 
or descriptor blob.
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "Adding BLOB column %s requires a comment 
directive ('%s' "
+                                            + "or '%s') so the storage mode is 
explicit.",
+                                    String.join(".", addColumn.fieldNames()),
+                                    BlobSchemaUtils.BLOB_FIELD_DIRECTIVE,
+                                    
BlobSchemaUtils.BLOB_DESCRIPTOR_FIELD_DIRECTIVE));
+                }
+
                 int id = highestFieldId.incrementAndGet();
-                DataType dataType = 
ReassignFieldId.reassign(addColumn.dataType(), highestFieldId);
+                DataType dataType = 
ReassignFieldId.reassign(requestedDataType, highestFieldId);
+                String storedComment = effectiveComment;
                 new NestedColumnModifier(addColumn.fieldNames(), 
lazyIdentifier) {
                     @Override
                     protected void updateLastColumn(
@@ -352,8 +391,7 @@ public class SchemaManager implements Serializable {
                                     Catalog.ColumnNotExistException {
                         assertColumnNotExists(newFields, fieldName, 
lazyIdentifier);
 
-                        DataField dataField =
-                                new DataField(id, fieldName, dataType, 
addColumn.description());
+                        DataField dataField = new DataField(id, fieldName, 
dataType, storedComment);
 
                         // key: name ; value : index
                         Map<String, Integer> map = new HashMap<>();
@@ -435,6 +473,9 @@ public class SchemaManager implements Serializable {
             } else if (change instanceof DropColumn) {
                 DropColumn drop = (DropColumn) change;
                 dropColumnValidation(oldTableSchema, drop);
+                if (drop.fieldNames().length == 1) {
+                    
BlobSchemaUtils.removeFromBlobOptions(drop.fieldNames()[0], newOptions);
+                }
                 new NestedColumnModifier(drop.fieldNames(), lazyIdentifier) {
                     @Override
                     protected void updateLastColumn(
@@ -451,6 +492,8 @@ public class SchemaManager implements Serializable {
                 UpdateColumnType update = (UpdateColumnType) change;
                 assertNotUpdatingPartitionKeys(oldTableSchema, 
update.fieldNames(), "update");
                 assertNotUpdatingPrimaryKeys(oldTableSchema, 
update.fieldNames(), "update");
+                assertNotChangingBlobColumnType(
+                        newFields, update.fieldNames(), update.newDataType());
                 updateNestedColumn(
                         newFields,
                         update.fieldNames(),
@@ -923,6 +966,28 @@ public class SchemaManager implements Serializable {
         }
     }
 
+    private static void assertNotChangingBlobColumnType(
+            List<DataField> fields, String[] fieldNames, DataType newType) {
+        if (fieldNames.length > 1) {
+            return;
+        }
+        String fieldName = fieldNames[0];
+        for (DataField field : fields) {
+            if (!field.name().equals(fieldName)) {
+                continue;
+            }
+            boolean wasBlob = field.type().is(DataTypeRoot.BLOB);
+            boolean willBeBlob = newType.is(DataTypeRoot.BLOB);
+            if (wasBlob || willBeBlob) {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Cannot change column type involving BLOB: 
[%s] %s -> %s",
+                                fieldName, field.type(), newType));
+            }
+            return;
+        }
+    }
+
     private abstract static class NestedColumnModifier {
 
         private final String[] updateFieldNames;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
index 1cbaf903f0..f230216a58 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
@@ -422,7 +422,8 @@ public class MultipleBlobTableTest extends TableTestBase {
         // Add new blob column f3
         catalog.alterTable(
                 identifier(),
-                Collections.singletonList(SchemaChange.addColumn("f3", 
DataTypes.BLOB())),
+                Collections.singletonList(
+                        SchemaChange.addColumn("f3", DataTypes.BLOB(), 
"__BLOB_FIELD", null)),
                 false);
 
         // Write more data with both f2 and f3
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/BlobSchemaUtilsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/BlobSchemaUtilsTest.java
new file mode 100644
index 0000000000..1813fe2ecf
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/schema/BlobSchemaUtilsTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.schema;
+
+import org.apache.paimon.CoreOptions;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link BlobSchemaUtils}. */
+public class BlobSchemaUtilsTest {
+
+    @Test
+    public void testParseAddColumnComment() {
+        // null and non-directive comments are passthrough.
+        assertThat(BlobSchemaUtils.parseAddColumnComment(null)).isNull();
+        assertThat(BlobSchemaUtils.parseAddColumnComment("")).isNull();
+        assertThat(BlobSchemaUtils.parseAddColumnComment("normal user 
comment")).isNull();
+        // case-sensitive: lowercase is not a directive.
+        assertThat(BlobSchemaUtils.parseAddColumnComment("__blob_field; 
x")).isNull();
+
+        // bare BLOB_FIELD directive
+        BlobSchemaUtils.ParsedDirective bareBlob =
+                
BlobSchemaUtils.parseAddColumnComment(BlobSchemaUtils.BLOB_FIELD_DIRECTIVE);
+        
assertThat(bareBlob.optionKey()).isEqualTo(CoreOptions.BLOB_FIELD.key());
+        assertThat(bareBlob.realComment()).isNull();
+
+        // BLOB_FIELD with trailing semicolon only — still no real comment
+        BlobSchemaUtils.ParsedDirective trailingSemi =
+                
BlobSchemaUtils.parseAddColumnComment(BlobSchemaUtils.BLOB_FIELD_DIRECTIVE + 
";");
+        assertThat(trailingSemi.realComment()).isNull();
+
+        // BLOB_FIELD with real comment (note inner whitespace trimmed).
+        BlobSchemaUtils.ParsedDirective withComment =
+                BlobSchemaUtils.parseAddColumnComment(
+                        BlobSchemaUtils.BLOB_FIELD_DIRECTIVE + ";   profile 
picture  ");
+        
assertThat(withComment.optionKey()).isEqualTo(CoreOptions.BLOB_FIELD.key());
+        assertThat(withComment.realComment()).isEqualTo("profile picture");
+
+        // BLOB_DESCRIPTOR_FIELD directive
+        BlobSchemaUtils.ParsedDirective descriptor =
+                BlobSchemaUtils.parseAddColumnComment(
+                        BlobSchemaUtils.BLOB_DESCRIPTOR_FIELD_DIRECTIVE + "; 
desc text");
+        
assertThat(descriptor.optionKey()).isEqualTo(CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
+        assertThat(descriptor.realComment()).isEqualTo("desc text");
+    }
+
+    @Test
+    public void testParseRejectsUnknownDirective() {
+        assertThatThrownBy(() -> 
BlobSchemaUtils.parseAddColumnComment("__BLOB_VIEW_FIELD; x"))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Unsupported BLOB directive");
+        assertThatThrownBy(() -> 
BlobSchemaUtils.parseAddColumnComment("__BLOB_UNKNOWN"))
+                .isInstanceOf(IllegalArgumentException.class);
+        // a __BLOB_FIELD prefix without a `;` boundary is not a valid 
directive.
+        assertThatThrownBy(() -> 
BlobSchemaUtils.parseAddColumnComment("__BLOB_FIELDX"))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    public void testModifyBlobOptions() {
+        Map<String, String> opts = new HashMap<>();
+        BlobSchemaUtils.modifyBlobOptions(CoreOptions.BLOB_FIELD.key(), "a", 
opts);
+        assertThat(opts).containsEntry(CoreOptions.BLOB_FIELD.key(), "a");
+
+        BlobSchemaUtils.modifyBlobOptions(CoreOptions.BLOB_FIELD.key(), "b", 
opts);
+        assertThat(opts).containsEntry(CoreOptions.BLOB_FIELD.key(), "a,b");
+
+        
BlobSchemaUtils.modifyBlobOptions(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "c", 
opts);
+        
assertThat(opts).containsEntry(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "c");
+
+        assertThatThrownBy(
+                        () ->
+                                BlobSchemaUtils.modifyBlobOptions(
+                                        "not-a-blob-option", "x", new 
HashMap<>()))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Unsupported BLOB directive");
+    }
+
+    @Test
+    public void testModifyBlobOptionsMigratesLegacyFallbackKey() {
+        // legacy option holds the descriptor field; canonical key absent.
+        Map<String, String> opts = new HashMap<>();
+        opts.put("blob.stored-descriptor-fields", "legacy_col");
+
+        
BlobSchemaUtils.modifyBlobOptions(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), 
"new_col", opts);
+
+        // fallback value is migrated to canonical key, fallback key removed 
to avoid stale data.
+        assertThat(opts)
+                .containsEntry(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), 
"legacy_col,new_col");
+        assertThat(opts).doesNotContainKey("blob.stored-descriptor-fields");
+    }
+
+    @Test
+    public void testRemoveFromBlobOptions() {
+        // pre-populated with all 4 canonical keys + the legacy fallback for 
descriptor.
+        Map<String, String> opts = new HashMap<>();
+        opts.put(CoreOptions.BLOB_FIELD.key(), "a,b");
+        opts.put(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "b,c");
+        opts.put(CoreOptions.BLOB_VIEW_FIELD.key(), "b");
+        opts.put(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(), "b");
+        opts.put("blob.stored-descriptor-fields", "b,legacy");
+
+        BlobSchemaUtils.removeFromBlobOptions("b", opts);
+
+        // b removed from every csv; keys whose csv becomes empty are dropped.
+        assertThat(opts).containsEntry(CoreOptions.BLOB_FIELD.key(), "a");
+        
assertThat(opts).containsEntry(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "c");
+        assertThat(opts).doesNotContainKey(CoreOptions.BLOB_VIEW_FIELD.key());
+        
assertThat(opts).doesNotContainKey(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key());
+        assertThat(opts).containsEntry("blob.stored-descriptor-fields", 
"legacy");
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 58d82b0bc9..a27c1a1251 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -41,8 +41,10 @@ import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.LazyField;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
@@ -188,6 +190,223 @@ public class SchemaEvolutionTest {
                         columnName, identifier.getFullName());
     }
 
+    @Test
+    public void testAddBlobColumnViaCommentDirective() throws Exception {
+        // create table with one pre-existing BLOB column registered in 
blob-field, so we can
+        // also verify that ADD COLUMN appends to (rather than overwrites) the 
existing value.
+        Map<String, String> options = blobEnabledOptions();
+        options.put(CoreOptions.BLOB_FIELD.key(), "existing_col");
+        schemaManager.createTable(
+                new Schema(
+                        RowType.of(
+                                        new DataField[] {
+                                            new DataField(0, "k", 
DataTypes.INT()),
+                                            new DataField(
+                                                    1, "existing_col", 
DataTypes.BLOB().copy(true))
+                                        })
+                                .getFields(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        options,
+                        ""));
+
+        // bare directive — no user comment, appended to existing blob-field 
value.
+        // directive + user comment, SDK caller passes a BlobType directly 
(allowed when
+        // accompanied by a directive so the storage mode is explicit).
+        schemaManager.commitChanges(
+                ImmutableList.of(
+                        SchemaChange.addColumn("picture", DataTypes.BYTES(), 
"__BLOB_FIELD", null),
+                        SchemaChange.addColumn(
+                                "desc_col",
+                                DataTypes.BLOB(),
+                                "__BLOB_DESCRIPTOR_FIELD; descriptor comment",
+                                null)));
+
+        TableSchema latest = schemaManager.latest().get();
+
+        DataField picture =
+                latest.fields().stream().filter(f -> 
f.name().equals("picture")).findFirst().get();
+        assertThat(picture.type().getTypeRoot()).isEqualTo(DataTypeRoot.BLOB);
+        assertThat(picture.description()).isNull();
+
+        DataField desc =
+                latest.fields().stream().filter(f -> 
f.name().equals("desc_col")).findFirst().get();
+        assertThat(desc.type().getTypeRoot()).isEqualTo(DataTypeRoot.BLOB);
+        assertThat(desc.description()).isEqualTo("descriptor comment");
+
+        assertThat(latest.options().get(CoreOptions.BLOB_FIELD.key()))
+                .isEqualTo("existing_col,picture");
+        
assertThat(latest.options().get(CoreOptions.BLOB_DESCRIPTOR_FIELD.key()))
+                .isEqualTo("desc_col");
+    }
+
+    @Test
+    public void testAddBlobColumnErrors() throws Exception {
+        schemaManager.createTable(
+                new Schema(
+                        RowType.of(
+                                        new DataField[] {
+                                            new DataField(0, "k", 
DataTypes.INT()),
+                                            new DataField(
+                                                    1,
+                                                    "nested",
+                                                    DataTypes.ROW(
+                                                            new DataField(2, 
"a", DataTypes.INT())))
+                                        })
+                                .getFields(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        new HashMap<>(),
+                        ""));
+
+        // non-BYTES/BINARY type rejected.
+        assertThatThrownBy(
+                        () ->
+                                schemaManager.commitChanges(
+                                        Collections.singletonList(
+                                                SchemaChange.addColumn(
+                                                        "bad",
+                                                        DataTypes.INT(),
+                                                        "__BLOB_FIELD",
+                                                        null))))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("must be of BYTES, BINARY or BLOB type");
+
+        // nested column rejected.
+        assertThatThrownBy(
+                        () ->
+                                schemaManager.commitChanges(
+                                        Collections.singletonList(
+                                                SchemaChange.addColumn(
+                                                        new String[] 
{"nested", "blob"},
+                                                        DataTypes.BYTES(),
+                                                        "__BLOB_FIELD",
+                                                        null))))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("nested column");
+
+        // unknown __BLOB directive rejected (e.g. __BLOB_VIEW_FIELD is not 
supported).
+        assertThatThrownBy(
+                        () ->
+                                schemaManager.commitChanges(
+                                        Collections.singletonList(
+                                                SchemaChange.addColumn(
+                                                        "x",
+                                                        DataTypes.BYTES(),
+                                                        "__BLOB_VIEW_FIELD",
+                                                        null))))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Unsupported BLOB directive");
+
+        // raw BlobType without any directive rejected — SDK callers must go 
through the
+        // directive path so the storage mode (blob-field vs 
blob-descriptor-field) is explicit.
+        assertThatThrownBy(
+                        () ->
+                                schemaManager.commitChanges(
+                                        Collections.singletonList(
+                                                SchemaChange.addColumn(
+                                                        "raw_blob", 
DataTypes.BLOB(), null, null))))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("requires a comment directive");
+
+        // SET OPTION on blob-field is rejected (the option is @Immutable).
+        TableSchema oldSchema = schemaManager.latest().get();
+        LazyField<Boolean> hasSnapshots = new LazyField<>(() -> true);
+        LazyField<Identifier> lazyId = new LazyField<>(() -> identifier);
+        assertThatThrownBy(
+                        () ->
+                                SchemaManager.generateTableSchema(
+                                        oldSchema,
+                                        Collections.singletonList(
+                                                SchemaChange.setOption(
+                                                        
CoreOptions.BLOB_FIELD.key(), "k")),
+                                        hasSnapshots,
+                                        lazyId))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining(CoreOptions.BLOB_FIELD.key());
+    }
+
+    @Test
+    public void testDropBlobColumnCleansOptions() throws Exception {
+        // table with one descriptor BLOB col registered in both 
blob-descriptor-field and
+        // blob-external-storage-field (subset rule), and one normal blob col 
in blob-field.
+        Map<String, String> options = blobEnabledOptions();
+        options.put(CoreOptions.BLOB_FIELD.key(), "pic");
+        options.put(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "ext");
+        options.put(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(), "ext");
+        options.put(CoreOptions.BLOB_EXTERNAL_STORAGE_PATH.key(), 
"/tmp/blob-ext");
+        schemaManager.createTable(
+                new Schema(
+                        RowType.of(
+                                        new DataField[] {
+                                            new DataField(0, "k", 
DataTypes.INT()),
+                                            new DataField(1, "pic", 
DataTypes.BLOB().copy(true)),
+                                            new DataField(2, "ext", 
DataTypes.BLOB().copy(true))
+                                        })
+                                .getFields(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        options,
+                        ""));
+
+        // drop the descriptor BLOB column — it must vanish from both 
descriptor-field and
+        // external-storage-field; the other BLOB column is untouched.
+        
schemaManager.commitChanges(Collections.singletonList(SchemaChange.dropColumn("ext")));
+
+        TableSchema latest = schemaManager.latest().get();
+        
assertThat(latest.options().get(CoreOptions.BLOB_FIELD.key())).isEqualTo("pic");
+        
assertThat(latest.options()).doesNotContainKey(CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
+        assertThat(latest.options())
+                
.doesNotContainKey(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key());
+    }
+
+    @Test
+    public void testUpdateColumnTypeOnBlobIsRejected() throws Exception {
+        Map<String, String> options = blobEnabledOptions();
+        options.put(CoreOptions.BLOB_FIELD.key(), "pic");
+        schemaManager.createTable(
+                new Schema(
+                        RowType.of(
+                                        new DataField[] {
+                                            new DataField(0, "k", 
DataTypes.INT()),
+                                            new DataField(1, "pic", 
DataTypes.BLOB().copy(true)),
+                                            new DataField(2, "raw", 
DataTypes.BYTES())
+                                        })
+                                .getFields(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        options,
+                        ""));
+
+        // BLOB -> BYTES rejected.
+        assertThatThrownBy(
+                        () ->
+                                schemaManager.commitChanges(
+                                        Collections.singletonList(
+                                                SchemaChange.updateColumnType(
+                                                        "pic", 
DataTypes.BYTES()))))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("BLOB");
+
+        // BYTES -> BLOB rejected (must be added via ADD COLUMN directive 
instead).
+        assertThatThrownBy(
+                        () ->
+                                schemaManager.commitChanges(
+                                        Collections.singletonList(
+                                                SchemaChange.updateColumnType(
+                                                        "raw", 
DataTypes.BLOB()))))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("BLOB");
+    }
+
+    private static Map<String, String> blobEnabledOptions() {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        options.put(CoreOptions.BUCKET.key(), "-1");
+        return options;
+    }
+
     @Test
     public void testUpdateFieldType() throws Exception {
         Schema schema =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index d51c1d006a..8e6ae0b123 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -1839,4 +1839,25 @@ public class SchemaChangeITCase extends 
CatalogITCaseBase {
                                 UnsupportedOperationException.class,
                                 "Cannot drop primary keys on a non-empty 
table."));
     }
+
+    private static final String BLOB_TABLE_OPTIONS =
+            "'row-tracking.enabled'='true', 'data-evolution.enabled'='true', 
'bucket'='-1'";
+
+    @Test
+    public void testAddBlobColumnViaCommentDirective() {
+        sql("CREATE TABLE T (id INT, data STRING) WITH (" + BLOB_TABLE_OPTIONS 
+ ")");
+
+        // bare directive — no user comment
+        sql("ALTER TABLE T ADD desc_col BYTES COMMENT 
'__BLOB_DESCRIPTOR_FIELD'");
+        // directive + user comment
+        sql("ALTER TABLE T ADD picture BYTES COMMENT '__BLOB_FIELD; profile 
picture'");
+
+        String createSql = sql("SHOW CREATE TABLE T").get(0).toString();
+        assertThat(createSql).doesNotContain("__BLOB");
+        assertThat(createSql).contains("`desc_col`");
+        assertThat(createSql).contains("`picture`");
+        assertThat(createSql).contains("COMMENT 'profile picture'");
+        assertThat(createSql).contains("'blob-field' = 'picture'");
+        assertThat(createSql).contains("'blob-descriptor-field' = 'desc_col'");
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index fc8b4adb6e..7afe3c76cf 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -1080,4 +1080,38 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                 .containsExactlyInAnyOrder(
                         "[1,APPLE,1000000000000]", "[2,cat,200]", 
"[3,FLOWER,3000000000000]");
     }
+
+    private static final String BLOB_TABLE_PROPS =
+            "'row-tracking.enabled'='true', 'data-evolution.enabled'='true', 
'bucket'='-1'";
+
+    @Test
+    public void testAddBlobColumnViaCommentDirective() {
+        String table = "paimon.default.blob_add_col";
+        spark.sql(
+                "CREATE TABLE "
+                        + table
+                        + " (id INT, data STRING) TBLPROPERTIES ("
+                        + BLOB_TABLE_PROPS
+                        + ")");
+
+        // bare directive — no user comment
+        spark.sql(
+                "ALTER TABLE "
+                        + table
+                        + " ADD COLUMN desc_col BINARY COMMENT 
'__BLOB_DESCRIPTOR_FIELD'");
+        // directive + user comment
+        spark.sql(
+                "ALTER TABLE "
+                        + table
+                        + " ADD COLUMN picture BINARY COMMENT '__BLOB_FIELD; 
profile picture'");
+
+        String createSql =
+                spark.sql("SHOW CREATE TABLE " + 
table).collectAsList().get(0).toString();
+        assertThat(createSql).doesNotContain("__BLOB");
+        assertThat(createSql).contains("desc_col");
+        assertThat(createSql).contains("picture");
+        assertThat(createSql).contains("profile picture");
+        assertThat(createSql).contains("'blob-field' = 'picture'");
+        assertThat(createSql).contains("'blob-descriptor-field' = 'desc_col'");
+    }
 }


Reply via email to