This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 46b120da29 [core] support adding blob column through comments (#7996)
46b120da29 is described below
commit 46b120da2918ea51d0f4c51768fed1184f4356d2
Author: Faiz <[email protected]>
AuthorDate: Fri May 29 13:25:49 2026 +0800
[core] support adding blob column through comments (#7996)
---
docs/docs/append-table/blob.mdx | 50 +++++
.../main/java/org/apache/paimon/CoreOptions.java | 1 +
.../org/apache/paimon/schema/BlobSchemaUtils.java | 191 ++++++++++++++++++
.../org/apache/paimon/schema/SchemaManager.java | 71 ++++++-
.../paimon/append/MultipleBlobTableTest.java | 3 +-
.../apache/paimon/schema/BlobSchemaUtilsTest.java | 134 +++++++++++++
.../apache/paimon/table/SchemaEvolutionTest.java | 219 +++++++++++++++++++++
.../apache/paimon/flink/SchemaChangeITCase.java | 21 ++
.../paimon/spark/SparkSchemaEvolutionITCase.java | 34 ++++
9 files changed, 720 insertions(+), 4 deletions(-)
diff --git a/docs/docs/append-table/blob.mdx b/docs/docs/append-table/blob.mdx
index dfe78709b2..a3d4eedca5 100644
--- a/docs/docs/append-table/blob.mdx
+++ b/docs/docs/append-table/blob.mdx
@@ -268,6 +268,56 @@ CREATE TABLE image_table (
</Tabs>
+### Adding a Blob Column to an Existing Table
+
+A BLOB column can be added to an existing blob-enabled table with a single
`ALTER TABLE ADD COLUMN` statement. Because most SQL engines do not have a
`BLOB` syntax, the new column is declared as `BYTES` or `BINARY` and the BLOB
storage mode is selected via a **comment directive**:
+
+- `__BLOB_FIELD` — store the column as a default blob (raw bytes written to
`.blob` files, equivalent to `blob-field`).
+- `__BLOB_DESCRIPTOR_FIELD` — store the column as a descriptor-only blob
inline in data files (equivalent to `blob-descriptor-field`).
+
+Anything after the optional `;` separator is preserved as the column's real
comment.
+
+<Tabs groupId="blob-add-column">
+
+<TabItem value="flink-sql" label="Flink SQL">
+
+```sql
+-- Add a blob-field column (no extra user comment)
+ALTER TABLE image_table ADD picture BYTES COMMENT '__BLOB_FIELD';
+
+-- Add a descriptor-field column with a real user comment
+ALTER TABLE image_table
+ ADD video BYTES COMMENT '__BLOB_DESCRIPTOR_FIELD; promotional video';
+```
+
+</TabItem>
+
+<TabItem value="spark-sql" label="Spark SQL">
+
+```sql
+-- Add a blob-field column (no extra user comment)
+ALTER TABLE image_table ADD COLUMN picture BINARY COMMENT '__BLOB_FIELD';
+
+-- Add a descriptor-field column with a real user comment
+ALTER TABLE image_table
+ ADD COLUMN video BINARY COMMENT '__BLOB_DESCRIPTOR_FIELD; promotional
video';
+```
+
+</TabItem>
+
+</Tabs>
+
+Paimon converts the declared `BYTES`/`BINARY` type to `BLOB`, appends the new
column to the corresponding option (`blob-field` or `blob-descriptor-field`),
and stores the trimmed real comment on the column. The whole operation is
atomic — no need to `SET` an option first and then `ADD COLUMN`.
+
+#### Limitations
+
+1. **Storage mode must be explicit.** Only `__BLOB_FIELD` and
`__BLOB_DESCRIPTOR_FIELD` are accepted. `blob-view-field` and
`blob-external-storage-field` cannot be added this way; they must be configured
at table creation time.
+2. **An unknown `__BLOB`-prefixed directive is rejected** so typos do not
silently fall through as a regular comment.
+3. **Column type must be `BYTES` / `BINARY` (or `BLOB` when calling the Java
API directly).** Other types with a BLOB directive are rejected.
+4. **Raw BLOB without directive is rejected.** When calling the Java SDK and
passing `DataTypes.BLOB()` to `SchemaChange.addColumn`, the directive is still
required so the storage mode (default vs descriptor) is unambiguous.
+5. **Existing columns cannot be converted to/from BLOB.** `ALTER TABLE ...
CHANGE`/`ALTER COLUMN TYPE` between BLOB and any other type is rejected — both
directions break already-written data.
+6. **Dropping a BLOB column cleans the options automatically.** When you
`ALTER TABLE ... DROP COLUMN`, Paimon removes the dropped name from
`blob-field` / `blob-descriptor-field` / `blob-view-field` /
`blob-external-storage-field` so the remaining options stay consistent with the
schema.
+
### Inserting Blob Data
<Tabs groupId="blob-insert">
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 6b0ef75ff8..dc5755e1ba 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2308,6 +2308,7 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("Format table commit hive sync uri.");
+ @Immutable
public static final ConfigOption<String> BLOB_FIELD =
key("blob-field")
.stringType()
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/BlobSchemaUtils.java
b/paimon-core/src/main/java/org/apache/paimon/schema/BlobSchemaUtils.java
new file mode 100644
index 0000000000..18e388ecca
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/BlobSchemaUtils.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.schema;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.FallbackKey;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/** Utilities for BLOB-related schema evolution (ALTER TABLE ADD COLUMN
comment directives). */
+public final class BlobSchemaUtils {
+
+ public static final String BLOB_FIELD_DIRECTIVE = "__BLOB_FIELD";
+ public static final String BLOB_DESCRIPTOR_FIELD_DIRECTIVE =
"__BLOB_DESCRIPTOR_FIELD";
+
+ private BlobSchemaUtils() {}
+
+ /**
+ * Parses the comment of an {@code ALTER TABLE ADD COLUMN} statement.
Returns {@code null} when
+ * the comment is a regular user comment; returns a {@link
ParsedDirective} when the comment
+ * begins with a supported BLOB directive. Throws {@link
IllegalArgumentException} when the
+ * comment begins with {@code __BLOB} but is not one of the supported
directives.
+ */
+ @Nullable
+ public static ParsedDirective parseAddColumnComment(@Nullable String
comment) {
+ if (comment == null || !comment.startsWith("__BLOB")) {
+ return null;
+ }
+ comment = StringUtils.trim(comment);
+ String optionKey = matchDirective(comment,
BLOB_DESCRIPTOR_FIELD_DIRECTIVE);
+ String marker = BLOB_DESCRIPTOR_FIELD_DIRECTIVE;
+ if (optionKey == null) {
+ optionKey = matchDirective(comment, BLOB_FIELD_DIRECTIVE);
+ marker = BLOB_FIELD_DIRECTIVE;
+ }
+ Preconditions.checkArgument(
+ optionKey != null,
+ "Unsupported BLOB directive in column comment: '%s'. Supported
directives are "
+ + "'%s' and '%s'.",
+ comment,
+ BLOB_FIELD_DIRECTIVE,
+ BLOB_DESCRIPTOR_FIELD_DIRECTIVE);
+ String realComment =
+ comment.length() == marker.length()
+ ? null
+ : comment.substring(marker.length() + 1).trim();
+ if (realComment != null && realComment.isEmpty()) {
+ realComment = null;
+ }
+ return new ParsedDirective(optionKey, realComment);
+ }
+
+ @Nullable
+ private static String matchDirective(String comment, String marker) {
+ if (!comment.startsWith(marker)) {
+ return null;
+ }
+ if (comment.length() == marker.length()) {
+ return optionKeyFor(marker);
+ }
+ return comment.charAt(marker.length()) == ';' ? optionKeyFor(marker) :
null;
+ }
+
+ private static String optionKeyFor(String marker) {
+ if (BLOB_FIELD_DIRECTIVE.equals(marker)) {
+ return CoreOptions.BLOB_FIELD.key();
+ } else if (BLOB_DESCRIPTOR_FIELD_DIRECTIVE.equals(marker)) {
+ return CoreOptions.BLOB_DESCRIPTOR_FIELD.key();
+ } else {
+ throw new IllegalArgumentException("Unsupported BLOB directive: "
+ marker);
+ }
+ }
+
+ /**
+ * Modify blob options, ensure the `blob-field`, `blob-descriptor-field`
is consistent with
+ * actual schema. If the canonical key is empty but a fallback key holds
the value (e.g. legacy
+ * {@code blob.stored-descriptor-fields}), the fallback value is migrated
to the canonical key
+ * before appending so old entries are not shadowed.
+ */
+ public static void modifyBlobOptions(
+ String blobKey, String fieldName, Map<String, String> options) {
+ ConfigOption<String> option;
+ if (CoreOptions.BLOB_FIELD.key().equals(blobKey)) {
+ option = CoreOptions.BLOB_FIELD;
+ } else if (CoreOptions.BLOB_DESCRIPTOR_FIELD.key().equals(blobKey)) {
+ option = CoreOptions.BLOB_DESCRIPTOR_FIELD;
+ } else {
+ throw new IllegalArgumentException("Unsupported BLOB directive: "
+ blobKey);
+ }
+
+ String existing = options.get(blobKey);
+ if (existing == null || existing.isEmpty()) {
+ // migrate legacy fallback keys to current canonical key
+ for (FallbackKey fk : option.fallbackKeys()) {
+ String fallbackValue = options.remove(fk.getKey());
+ if (fallbackValue != null && !fallbackValue.isEmpty()) {
+ existing = fallbackValue;
+ break;
+ }
+ }
+ }
+ String newValue = existing == null ? fieldName : existing + "," +
fieldName;
+ options.put(blobKey, newValue);
+ }
+
+ /**
+ * Removes {@code fieldName} from every BLOB-related comma-separated
option (and the legacy
+ * fallback key for {@code blob-descriptor-field}). When the resulting csv
becomes empty the
+ * option key is dropped entirely. Used when a BLOB column is being
dropped.
+ */
+ public static void removeFromBlobOptions(String fieldName, Map<String,
String> options) {
+ ConfigOption<String>[] keys =
+ new ConfigOption[] {
+ CoreOptions.BLOB_FIELD,
+ CoreOptions.BLOB_DESCRIPTOR_FIELD,
+ CoreOptions.BLOB_VIEW_FIELD,
+ CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD
+ };
+ for (ConfigOption<String> option : keys) {
+ removeFromCsvOption(option.key(), fieldName, options);
+ for (FallbackKey fk : option.fallbackKeys()) {
+ removeFromCsvOption(fk.getKey(), fieldName, options);
+ }
+ }
+ }
+
+ private static void removeFromCsvOption(
+ String key, String fieldName, Map<String, String> options) {
+ String existing = options.get(key);
+ if (existing == null || existing.isEmpty()) {
+ return;
+ }
+ StringBuilder sb = new StringBuilder();
+ for (String v : existing.split(",")) {
+ String trimmed = v.trim();
+ if (trimmed.isEmpty() || trimmed.equals(fieldName)) {
+ continue;
+ }
+ if (sb.length() > 0) {
+ sb.append(',');
+ }
+ sb.append(trimmed);
+ }
+ if (sb.length() == 0) {
+ options.remove(key);
+ } else {
+ options.put(key, sb.toString());
+ }
+ }
+
+ /** Parsed BLOB directive: the option key to update and the user-facing
comment. */
+ public static final class ParsedDirective {
+ private final String optionKey;
+ @Nullable private final String realComment;
+
+ private ParsedDirective(String optionKey, @Nullable String
realComment) {
+ this.optionKey = optionKey;
+ this.realComment = realComment;
+ }
+
+ public String optionKey() {
+ return optionKey;
+ }
+
+ @Nullable
+ public String realComment() {
+ return realComment;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 670960889d..09ddeb14b2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -39,6 +39,7 @@ import org.apache.paimon.schema.SchemaChange.UpdateComment;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.SchemaModification;
import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeCasts;
@@ -342,8 +343,46 @@ public class SchemaManager implements Serializable {
"Column %s cannot specify NOT NULL in the %s table.",
String.join(".", addColumn.fieldNames()),
lazyIdentifier.get().getFullName());
+
+ BlobSchemaUtils.ParsedDirective blobDirective =
+
BlobSchemaUtils.parseAddColumnComment(addColumn.description());
+ DataType requestedDataType = addColumn.dataType();
+ String effectiveComment = addColumn.description();
+ // try convert to blob type
+ if (blobDirective != null) {
+ Preconditions.checkArgument(
+ addColumn.fieldNames().length == 1,
+ "BLOB directive cannot be used on a nested column
%s.",
+ String.join(".", addColumn.fieldNames()));
+ DataTypeRoot root = requestedDataType.getTypeRoot();
+ Preconditions.checkArgument(
+ root == DataTypeRoot.VARBINARY
+ || root == DataTypeRoot.BINARY
+ || root == DataTypeRoot.BLOB,
+ "Column %s declared with a BLOB directive must be
of BYTES, "
+ + "BINARY or BLOB type, but was %s.",
+ addColumn.fieldNames()[0],
+ requestedDataType);
+ requestedDataType = new
BlobType(requestedDataType.isNullable());
+ effectiveComment = blobDirective.realComment();
+
+ BlobSchemaUtils.modifyBlobOptions(
+ blobDirective.optionKey(),
addColumn.fieldNames()[0], newOptions);
+ } else if (requestedDataType.is(DataTypeRoot.BLOB)) {
+ // We do not permit adding blob type column without
comment hint,
+ // since we don't know the storage mode i.e. native blob
or descriptor blob.
+ throw new UnsupportedOperationException(
+ String.format(
+ "Adding BLOB column %s requires a comment
directive ('%s' "
+ + "or '%s') so the storage mode is
explicit.",
+ String.join(".", addColumn.fieldNames()),
+ BlobSchemaUtils.BLOB_FIELD_DIRECTIVE,
+
BlobSchemaUtils.BLOB_DESCRIPTOR_FIELD_DIRECTIVE));
+ }
+
int id = highestFieldId.incrementAndGet();
- DataType dataType =
ReassignFieldId.reassign(addColumn.dataType(), highestFieldId);
+ DataType dataType =
ReassignFieldId.reassign(requestedDataType, highestFieldId);
+ String storedComment = effectiveComment;
new NestedColumnModifier(addColumn.fieldNames(),
lazyIdentifier) {
@Override
protected void updateLastColumn(
@@ -352,8 +391,7 @@ public class SchemaManager implements Serializable {
Catalog.ColumnNotExistException {
assertColumnNotExists(newFields, fieldName,
lazyIdentifier);
- DataField dataField =
- new DataField(id, fieldName, dataType,
addColumn.description());
+ DataField dataField = new DataField(id, fieldName,
dataType, storedComment);
// key: name ; value : index
Map<String, Integer> map = new HashMap<>();
@@ -435,6 +473,9 @@ public class SchemaManager implements Serializable {
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
dropColumnValidation(oldTableSchema, drop);
+ if (drop.fieldNames().length == 1) {
+
BlobSchemaUtils.removeFromBlobOptions(drop.fieldNames()[0], newOptions);
+ }
new NestedColumnModifier(drop.fieldNames(), lazyIdentifier) {
@Override
protected void updateLastColumn(
@@ -451,6 +492,8 @@ public class SchemaManager implements Serializable {
UpdateColumnType update = (UpdateColumnType) change;
assertNotUpdatingPartitionKeys(oldTableSchema,
update.fieldNames(), "update");
assertNotUpdatingPrimaryKeys(oldTableSchema,
update.fieldNames(), "update");
+ assertNotChangingBlobColumnType(
+ newFields, update.fieldNames(), update.newDataType());
updateNestedColumn(
newFields,
update.fieldNames(),
@@ -923,6 +966,28 @@ public class SchemaManager implements Serializable {
}
}
+ private static void assertNotChangingBlobColumnType(
+ List<DataField> fields, String[] fieldNames, DataType newType) {
+ if (fieldNames.length > 1) {
+ return;
+ }
+ String fieldName = fieldNames[0];
+ for (DataField field : fields) {
+ if (!field.name().equals(fieldName)) {
+ continue;
+ }
+ boolean wasBlob = field.type().is(DataTypeRoot.BLOB);
+ boolean willBeBlob = newType.is(DataTypeRoot.BLOB);
+ if (wasBlob || willBeBlob) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot change column type involving BLOB:
[%s] %s -> %s",
+ fieldName, field.type(), newType));
+ }
+ return;
+ }
+ }
+
private abstract static class NestedColumnModifier {
private final String[] updateFieldNames;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
index 1cbaf903f0..f230216a58 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
@@ -422,7 +422,8 @@ public class MultipleBlobTableTest extends TableTestBase {
// Add new blob column f3
catalog.alterTable(
identifier(),
- Collections.singletonList(SchemaChange.addColumn("f3",
DataTypes.BLOB())),
+ Collections.singletonList(
+ SchemaChange.addColumn("f3", DataTypes.BLOB(),
"__BLOB_FIELD", null)),
false);
// Write more data with both f2 and f3
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/BlobSchemaUtilsTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/BlobSchemaUtilsTest.java
new file mode 100644
index 0000000000..1813fe2ecf
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/schema/BlobSchemaUtilsTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.schema;
+
+import org.apache.paimon.CoreOptions;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link BlobSchemaUtils}. */
+public class BlobSchemaUtilsTest {
+
+ @Test
+ public void testParseAddColumnComment() {
+ // null and non-directive comments are passthrough.
+ assertThat(BlobSchemaUtils.parseAddColumnComment(null)).isNull();
+ assertThat(BlobSchemaUtils.parseAddColumnComment("")).isNull();
+ assertThat(BlobSchemaUtils.parseAddColumnComment("normal user
comment")).isNull();
+ // case-sensitive: lowercase is not a directive.
+ assertThat(BlobSchemaUtils.parseAddColumnComment("__blob_field;
x")).isNull();
+
+ // bare BLOB_FIELD directive
+ BlobSchemaUtils.ParsedDirective bareBlob =
+
BlobSchemaUtils.parseAddColumnComment(BlobSchemaUtils.BLOB_FIELD_DIRECTIVE);
+
assertThat(bareBlob.optionKey()).isEqualTo(CoreOptions.BLOB_FIELD.key());
+ assertThat(bareBlob.realComment()).isNull();
+
+ // BLOB_FIELD with trailing semicolon only — still no real comment
+ BlobSchemaUtils.ParsedDirective trailingSemi =
+
BlobSchemaUtils.parseAddColumnComment(BlobSchemaUtils.BLOB_FIELD_DIRECTIVE +
";");
+ assertThat(trailingSemi.realComment()).isNull();
+
+ // BLOB_FIELD with real comment (note inner whitespace trimmed).
+ BlobSchemaUtils.ParsedDirective withComment =
+ BlobSchemaUtils.parseAddColumnComment(
+ BlobSchemaUtils.BLOB_FIELD_DIRECTIVE + "; profile
picture ");
+
assertThat(withComment.optionKey()).isEqualTo(CoreOptions.BLOB_FIELD.key());
+ assertThat(withComment.realComment()).isEqualTo("profile picture");
+
+ // BLOB_DESCRIPTOR_FIELD directive
+ BlobSchemaUtils.ParsedDirective descriptor =
+ BlobSchemaUtils.parseAddColumnComment(
+ BlobSchemaUtils.BLOB_DESCRIPTOR_FIELD_DIRECTIVE + ";
desc text");
+
assertThat(descriptor.optionKey()).isEqualTo(CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
+ assertThat(descriptor.realComment()).isEqualTo("desc text");
+ }
+
+ @Test
+ public void testParseRejectsUnknownDirective() {
+ assertThatThrownBy(() ->
BlobSchemaUtils.parseAddColumnComment("__BLOB_VIEW_FIELD; x"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Unsupported BLOB directive");
+ assertThatThrownBy(() ->
BlobSchemaUtils.parseAddColumnComment("__BLOB_UNKNOWN"))
+ .isInstanceOf(IllegalArgumentException.class);
+ // a __BLOB_FIELD prefix without a `;` boundary is not a valid
directive.
+ assertThatThrownBy(() ->
BlobSchemaUtils.parseAddColumnComment("__BLOB_FIELDX"))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void testModifyBlobOptions() {
+ Map<String, String> opts = new HashMap<>();
+ BlobSchemaUtils.modifyBlobOptions(CoreOptions.BLOB_FIELD.key(), "a",
opts);
+ assertThat(opts).containsEntry(CoreOptions.BLOB_FIELD.key(), "a");
+
+ BlobSchemaUtils.modifyBlobOptions(CoreOptions.BLOB_FIELD.key(), "b",
opts);
+ assertThat(opts).containsEntry(CoreOptions.BLOB_FIELD.key(), "a,b");
+
+
BlobSchemaUtils.modifyBlobOptions(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "c",
opts);
+
assertThat(opts).containsEntry(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "c");
+
+ assertThatThrownBy(
+ () ->
+ BlobSchemaUtils.modifyBlobOptions(
+ "not-a-blob-option", "x", new
HashMap<>()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Unsupported BLOB directive");
+ }
+
+ @Test
+ public void testModifyBlobOptionsMigratesLegacyFallbackKey() {
+ // legacy option holds the descriptor field; canonical key absent.
+ Map<String, String> opts = new HashMap<>();
+ opts.put("blob.stored-descriptor-fields", "legacy_col");
+
+
BlobSchemaUtils.modifyBlobOptions(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(),
"new_col", opts);
+
+ // fallback value is migrated to canonical key, fallback key removed
to avoid stale data.
+ assertThat(opts)
+ .containsEntry(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(),
"legacy_col,new_col");
+ assertThat(opts).doesNotContainKey("blob.stored-descriptor-fields");
+ }
+
+ @Test
+ public void testRemoveFromBlobOptions() {
+ // pre-populated with all 4 canonical keys + the legacy fallback for
descriptor.
+ Map<String, String> opts = new HashMap<>();
+ opts.put(CoreOptions.BLOB_FIELD.key(), "a,b");
+ opts.put(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "b,c");
+ opts.put(CoreOptions.BLOB_VIEW_FIELD.key(), "b");
+ opts.put(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(), "b");
+ opts.put("blob.stored-descriptor-fields", "b,legacy");
+
+ BlobSchemaUtils.removeFromBlobOptions("b", opts);
+
+ // b removed from every csv; keys whose csv becomes empty are dropped.
+ assertThat(opts).containsEntry(CoreOptions.BLOB_FIELD.key(), "a");
+
assertThat(opts).containsEntry(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "c");
+ assertThat(opts).doesNotContainKey(CoreOptions.BLOB_VIEW_FIELD.key());
+
assertThat(opts).doesNotContainKey(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key());
+ assertThat(opts).containsEntry("blob.stored-descriptor-fields",
"legacy");
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 58d82b0bc9..a27c1a1251 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -41,8 +41,10 @@ import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.LazyField;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
@@ -188,6 +190,223 @@ public class SchemaEvolutionTest {
columnName, identifier.getFullName());
}
+ @Test
+ public void testAddBlobColumnViaCommentDirective() throws Exception {
+ // create table with one pre-existing BLOB column registered in
blob-field, so we can
+ // also verify that ADD COLUMN appends to (rather than overwrites) the
existing value.
+ Map<String, String> options = blobEnabledOptions();
+ options.put(CoreOptions.BLOB_FIELD.key(), "existing_col");
+ schemaManager.createTable(
+ new Schema(
+ RowType.of(
+ new DataField[] {
+ new DataField(0, "k",
DataTypes.INT()),
+ new DataField(
+ 1, "existing_col",
DataTypes.BLOB().copy(true))
+ })
+ .getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options,
+ ""));
+
+ // bare directive — no user comment, appended to existing blob-field
value.
+ // directive + user comment, SDK caller passes a BlobType directly
(allowed when
+ // accompanied by a directive so the storage mode is explicit).
+ schemaManager.commitChanges(
+ ImmutableList.of(
+ SchemaChange.addColumn("picture", DataTypes.BYTES(),
"__BLOB_FIELD", null),
+ SchemaChange.addColumn(
+ "desc_col",
+ DataTypes.BLOB(),
+ "__BLOB_DESCRIPTOR_FIELD; descriptor comment",
+ null)));
+
+ TableSchema latest = schemaManager.latest().get();
+
+ DataField picture =
+ latest.fields().stream().filter(f ->
f.name().equals("picture")).findFirst().get();
+ assertThat(picture.type().getTypeRoot()).isEqualTo(DataTypeRoot.BLOB);
+ assertThat(picture.description()).isNull();
+
+ DataField desc =
+ latest.fields().stream().filter(f ->
f.name().equals("desc_col")).findFirst().get();
+ assertThat(desc.type().getTypeRoot()).isEqualTo(DataTypeRoot.BLOB);
+ assertThat(desc.description()).isEqualTo("descriptor comment");
+
+ assertThat(latest.options().get(CoreOptions.BLOB_FIELD.key()))
+ .isEqualTo("existing_col,picture");
+
assertThat(latest.options().get(CoreOptions.BLOB_DESCRIPTOR_FIELD.key()))
+ .isEqualTo("desc_col");
+ }
+
+ @Test
+ public void testAddBlobColumnErrors() throws Exception {
+ schemaManager.createTable(
+ new Schema(
+ RowType.of(
+ new DataField[] {
+ new DataField(0, "k",
DataTypes.INT()),
+ new DataField(
+ 1,
+ "nested",
+ DataTypes.ROW(
+ new DataField(2,
"a", DataTypes.INT())))
+ })
+ .getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>(),
+ ""));
+
+ // non-BYTES/BINARY type rejected.
+ assertThatThrownBy(
+ () ->
+ schemaManager.commitChanges(
+ Collections.singletonList(
+ SchemaChange.addColumn(
+ "bad",
+ DataTypes.INT(),
+ "__BLOB_FIELD",
+ null))))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("must be of BYTES, BINARY or BLOB type");
+
+ // nested column rejected.
+ assertThatThrownBy(
+ () ->
+ schemaManager.commitChanges(
+ Collections.singletonList(
+ SchemaChange.addColumn(
+ new String[]
{"nested", "blob"},
+ DataTypes.BYTES(),
+ "__BLOB_FIELD",
+ null))))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("nested column");
+
+ // unknown __BLOB directive rejected (e.g. __BLOB_VIEW_FIELD is not
supported).
+ assertThatThrownBy(
+ () ->
+ schemaManager.commitChanges(
+ Collections.singletonList(
+ SchemaChange.addColumn(
+ "x",
+ DataTypes.BYTES(),
+ "__BLOB_VIEW_FIELD",
+ null))))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Unsupported BLOB directive");
+
+ // raw BlobType without any directive rejected — SDK callers must go
through the
+ // directive path so the storage mode (blob-field vs
blob-descriptor-field) is explicit.
+ assertThatThrownBy(
+ () ->
+ schemaManager.commitChanges(
+ Collections.singletonList(
+ SchemaChange.addColumn(
+ "raw_blob",
DataTypes.BLOB(), null, null))))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("requires a comment directive");
+
+ // SET OPTION on blob-field is rejected (the option is @Immutable).
+ TableSchema oldSchema = schemaManager.latest().get();
+ LazyField<Boolean> hasSnapshots = new LazyField<>(() -> true);
+ LazyField<Identifier> lazyId = new LazyField<>(() -> identifier);
+ assertThatThrownBy(
+ () ->
+ SchemaManager.generateTableSchema(
+ oldSchema,
+ Collections.singletonList(
+ SchemaChange.setOption(
+
CoreOptions.BLOB_FIELD.key(), "k")),
+ hasSnapshots,
+ lazyId))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining(CoreOptions.BLOB_FIELD.key());
+ }
+
+ @Test
+ public void testDropBlobColumnCleansOptions() throws Exception {
+ // table with one descriptor BLOB col registered in both
blob-descriptor-field and
+ // blob-external-storage-field (subset rule), and one normal blob col
in blob-field.
+ Map<String, String> options = blobEnabledOptions();
+ options.put(CoreOptions.BLOB_FIELD.key(), "pic");
+ options.put(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "ext");
+ options.put(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(), "ext");
+ options.put(CoreOptions.BLOB_EXTERNAL_STORAGE_PATH.key(),
"/tmp/blob-ext");
+ schemaManager.createTable(
+ new Schema(
+ RowType.of(
+ new DataField[] {
+ new DataField(0, "k",
DataTypes.INT()),
+ new DataField(1, "pic",
DataTypes.BLOB().copy(true)),
+ new DataField(2, "ext",
DataTypes.BLOB().copy(true))
+ })
+ .getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options,
+ ""));
+
+ // drop the descriptor BLOB column — it must vanish from both
descriptor-field and
+ // external-storage-field; the other BLOB column is untouched.
+
schemaManager.commitChanges(Collections.singletonList(SchemaChange.dropColumn("ext")));
+
+ TableSchema latest = schemaManager.latest().get();
+
assertThat(latest.options().get(CoreOptions.BLOB_FIELD.key())).isEqualTo("pic");
+
assertThat(latest.options()).doesNotContainKey(CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
+ assertThat(latest.options())
+
.doesNotContainKey(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key());
+ }
+
+ @Test
+ public void testUpdateColumnTypeOnBlobIsRejected() throws Exception {
+ Map<String, String> options = blobEnabledOptions();
+ options.put(CoreOptions.BLOB_FIELD.key(), "pic");
+ schemaManager.createTable(
+ new Schema(
+ RowType.of(
+ new DataField[] {
+ new DataField(0, "k",
DataTypes.INT()),
+ new DataField(1, "pic",
DataTypes.BLOB().copy(true)),
+ new DataField(2, "raw",
DataTypes.BYTES())
+ })
+ .getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options,
+ ""));
+
+ // BLOB -> BYTES rejected.
+ assertThatThrownBy(
+ () ->
+ schemaManager.commitChanges(
+ Collections.singletonList(
+ SchemaChange.updateColumnType(
+ "pic",
DataTypes.BYTES()))))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("BLOB");
+
+ // BYTES -> BLOB rejected (must be added via ADD COLUMN directive
instead).
+ assertThatThrownBy(
+ () ->
+ schemaManager.commitChanges(
+ Collections.singletonList(
+ SchemaChange.updateColumnType(
+ "raw",
DataTypes.BLOB()))))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("BLOB");
+ }
+
+ private static Map<String, String> blobEnabledOptions() {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ options.put(CoreOptions.BUCKET.key(), "-1");
+ return options;
+ }
+
@Test
public void testUpdateFieldType() throws Exception {
Schema schema =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index d51c1d006a..8e6ae0b123 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -1839,4 +1839,25 @@ public class SchemaChangeITCase extends
CatalogITCaseBase {
UnsupportedOperationException.class,
"Cannot drop primary keys on a non-empty
table."));
}
+
+ private static final String BLOB_TABLE_OPTIONS =
+ "'row-tracking.enabled'='true', 'data-evolution.enabled'='true',
'bucket'='-1'";
+
+ @Test
+ public void testAddBlobColumnViaCommentDirective() {
+ sql("CREATE TABLE T (id INT, data STRING) WITH (" + BLOB_TABLE_OPTIONS
+ ")");
+
+ // bare directive — no user comment
+ sql("ALTER TABLE T ADD desc_col BYTES COMMENT
'__BLOB_DESCRIPTOR_FIELD'");
+ // directive + user comment
+ sql("ALTER TABLE T ADD picture BYTES COMMENT '__BLOB_FIELD; profile
picture'");
+
+ String createSql = sql("SHOW CREATE TABLE T").get(0).toString();
+ assertThat(createSql).doesNotContain("__BLOB");
+ assertThat(createSql).contains("`desc_col`");
+ assertThat(createSql).contains("`picture`");
+ assertThat(createSql).contains("COMMENT 'profile picture'");
+ assertThat(createSql).contains("'blob-field' = 'picture'");
+ assertThat(createSql).contains("'blob-descriptor-field' = 'desc_col'");
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index fc8b4adb6e..7afe3c76cf 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -1080,4 +1080,38 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
.containsExactlyInAnyOrder(
"[1,APPLE,1000000000000]", "[2,cat,200]",
"[3,FLOWER,3000000000000]");
}
+
+ private static final String BLOB_TABLE_PROPS =
+ "'row-tracking.enabled'='true', 'data-evolution.enabled'='true',
'bucket'='-1'";
+
+ @Test
+ public void testAddBlobColumnViaCommentDirective() {
+ String table = "paimon.default.blob_add_col";
+ spark.sql(
+ "CREATE TABLE "
+ + table
+ + " (id INT, data STRING) TBLPROPERTIES ("
+ + BLOB_TABLE_PROPS
+ + ")");
+
+ // bare directive — no user comment
+ spark.sql(
+ "ALTER TABLE "
+ + table
+ + " ADD COLUMN desc_col BINARY COMMENT
'__BLOB_DESCRIPTOR_FIELD'");
+ // directive + user comment
+ spark.sql(
+ "ALTER TABLE "
+ + table
+ + " ADD COLUMN picture BINARY COMMENT '__BLOB_FIELD;
profile picture'");
+
+ String createSql =
+ spark.sql("SHOW CREATE TABLE " +
table).collectAsList().get(0).toString();
+ assertThat(createSql).doesNotContain("__BLOB");
+ assertThat(createSql).contains("desc_col");
+ assertThat(createSql).contains("picture");
+ assertThat(createSql).contains("profile picture");
+ assertThat(createSql).contains("'blob-field' = 'picture'");
+ assertThat(createSql).contains("'blob-descriptor-field' = 'desc_col'");
+ }
}