This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f908ed6168 [core] Allow blob inline fields without blob-field (#7827)
f908ed6168 is described below

commit f908ed61688e8843dd63b1ebaaec50dfa550a878
Author: YeJunHao <[email protected]>
AuthorDate: Wed May 13 16:34:14 2026 +0800

    [core] Allow blob inline fields without blob-field (#7827)
---
 docs/content/append-table/blob.md                  | 11 ++--
 .../shortcodes/generated/core_configuration.html   |  6 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  8 ++-
 .../org/apache/paimon/schema/SchemaValidation.java | 30 ++--------
 .../org/apache/paimon/append/BlobTableTest.java    | 66 ++++++++--------------
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 34 +++++------
 .../org/apache/paimon/flink/BlobTableITCase.java   | 32 +++++------
 .../java/org/apache/paimon/spark/SparkCatalog.java |  6 +-
 .../org/apache/paimon/spark/sql/BlobTestBase.scala |  2 +-
 9 files changed, 80 insertions(+), 115 deletions(-)

diff --git a/docs/content/append-table/blob.md 
b/docs/content/append-table/blob.md
index 73124964f1..d5cfc324f1 100644
--- a/docs/content/append-table/blob.md
+++ b/docs/content/append-table/blob.md
@@ -105,7 +105,7 @@ This allows one table to mix raw-data BLOB fields, 
descriptor-only BLOB fields,
       <td>No</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Specifies column names that should be stored as blob type. This is 
used when you want to treat a BYTES column as a BLOB.</td>
+      <td>Specifies column names that should be stored as blob type. This is 
used when you want to treat a BYTES column as a BLOB. Fields listed in 
<code>blob-descriptor-field</code> or <code>blob-view-field</code> are also 
treated as BLOB fields.</td>
     </tr>
     <tr>
       <td><h5>blob-as-descriptor</h5></td>
@@ -120,7 +120,7 @@ This allows one table to mix raw-data BLOB fields, 
descriptor-only BLOB fields,
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>
-        Comma-separated BLOB field names stored as serialized 
<code>BlobDescriptor</code> bytes inline in normal data files.
+        Comma-separated field names treated as BLOB fields and stored as 
serialized <code>BlobDescriptor</code> bytes inline in normal data files.
         By default, all blob fields store blob bytes in separate 
<code>.blob</code> files.
         If configured, one table can mix:
         some BLOB fields in <code>.blob</code> files and some as descriptor 
references.
@@ -132,9 +132,9 @@ This allows one table to mix raw-data BLOB fields, 
descriptor-only BLOB fields,
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>
-        Comma-separated BLOB field names stored as serialized 
<code>BlobViewStruct</code> bytes inline in normal data files.
+        Comma-separated field names treated as BLOB fields and stored as 
serialized <code>BlobViewStruct</code> bytes inline in normal data files.
         The field values reference BLOB values in upstream tables and are 
resolved at read time.
-        This option must be a subset of <code>blob-field</code> and must not 
overlap with <code>blob-descriptor-field</code>.
+        This option must not overlap with <code>blob-descriptor-field</code>.
       </td>
     </tr>
     <tr>
@@ -300,7 +300,7 @@ Blob view is useful when a downstream table should 
reference BLOB values already
 Blob view requires:
 
 - the upstream table to have row tracking enabled, so each row has a stable 
`_ROW_ID`
-- the downstream field to be listed in both `blob-field` and `blob-view-field`
+- the downstream field to be listed in `blob-view-field`
 - writes to provide a serialized `BlobViewStruct`; in Flink SQL, use the 
built-in `sys.blob_view` function
 
 The Flink SQL function signature is:
@@ -335,7 +335,6 @@ CREATE TABLE image_view_table (
 ) WITH (
     'row-tracking.enabled' = 'true',
     'data-evolution.enabled' = 'true',
-    'blob-field' = 'image_ref',
     'blob-view-field' = 'image_ref'
 );
 
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 5faf8276da..1928fde9dd 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -66,7 +66,7 @@ under the License.
             <td><h5>blob-descriptor-field</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>Comma-separated BLOB field names, selected from blob-field, to 
store as serialized BlobDescriptor bytes inline in data files.</td>
+            <td>Comma-separated field names to treat as BLOB fields and store 
as serialized BlobDescriptor bytes inline in data files.</td>
         </tr>
         <tr>
             <td><h5>blob-external-storage-field</h5></td>
@@ -84,13 +84,13 @@ under the License.
             <td><h5>blob-field</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>Specifies column names that should be stored as blob type. 
This is used when you want to treat a BYTES column as a BLOB.</td>
+            <td>Specifies column names that should be stored as blob type. 
This is used when you want to treat a BYTES column as a BLOB. Fields listed in 
blob-descriptor-field or blob-view-field are also treated as BLOB fields.</td>
         </tr>
         <tr>
             <td><h5>blob-view-field</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>Comma-separated BLOB field names, selected from blob-field, to 
store as serialized BlobViewStruct bytes inline in data files and resolve from 
upstream tables at read time.</td>
+            <td>Comma-separated field names to treat as BLOB fields and store 
as serialized BlobViewStruct bytes inline in data files and resolve from 
upstream tables at read time.</td>
         </tr>
         <tr>
             <td><h5>blob.split-by-file-size</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index a420110828..8ab03c868c 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2277,7 +2277,9 @@ public class CoreOptions implements Serializable {
                     .noDefaultValue()
                     .withDescription(
                             "Specifies column names that should be stored as 
blob type. "
-                                    + "This is used when you want to treat a 
BYTES column as a BLOB.");
+                                    + "This is used when you want to treat a 
BYTES column as a BLOB. "
+                                    + "Fields listed in blob-descriptor-field 
or blob-view-field "
+                                    + "are also treated as BLOB fields.");
 
     @Immutable
     public static final ConfigOption<String> BLOB_DESCRIPTOR_FIELD =
@@ -2286,7 +2288,7 @@ public class CoreOptions implements Serializable {
                     .noDefaultValue()
                     .withFallbackKeys("blob.stored-descriptor-fields")
                     .withDescription(
-                            "Comma-separated BLOB field names, selected from 
blob-field, to store "
+                            "Comma-separated field names to treat as BLOB 
fields and store "
                                     + "as serialized BlobDescriptor bytes 
inline in data files.");
 
     @Immutable
@@ -2295,7 +2297,7 @@ public class CoreOptions implements Serializable {
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Comma-separated BLOB field names, selected from 
blob-field, to store "
+                            "Comma-separated field names to treat as BLOB 
fields and store "
                                     + "as serialized BlobViewStruct bytes 
inline in data files and "
                                     + "resolve from upstream tables at read 
time.");
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index d2b9297317..b5285bb606 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -165,11 +165,10 @@ public class SchemaValidation {
         FileFormat fileFormat =
                 FileFormat.fromIdentifier(options.formatType(), new 
Options(schema.options()));
         RowType tableRowType = new RowType(schema.fields());
-        Set<String> blobFields = validateBlobFields(tableRowType, options);
-        Set<String> blobDescriptorFields =
-                validateBlobDescriptorFields(tableRowType, options, 
blobFields);
+        validateBlobFields(tableRowType, options);
+        Set<String> blobDescriptorFields = 
validateBlobDescriptorFields(tableRowType, options);
         Set<String> blobViewFields =
-                validateBlobViewFields(tableRowType, options, blobFields, 
blobDescriptorFields);
+                validateBlobViewFields(tableRowType, options, 
blobDescriptorFields);
         Set<String> blobInlineFields = new HashSet<>(blobDescriptorFields);
         blobInlineFields.addAll(blobViewFields);
         validateBlobExternalStorageFields(tableRowType, options, 
blobDescriptorFields);
@@ -719,7 +718,7 @@ public class SchemaValidation {
         }
     }
 
-    private static Set<String> validateBlobFields(RowType rowType, CoreOptions 
options) {
+    private static void validateBlobFields(RowType rowType, CoreOptions 
options) {
         Set<String> blobFieldNames =
                 rowType.getFields().stream()
                         .filter(field -> field.type().getTypeRoot() == 
DataTypeRoot.BLOB)
@@ -735,11 +734,9 @@ public class SchemaValidation {
                     field,
                     CoreOptions.BLOB_FIELD.key());
         }
-        return configured;
     }
 
-    private static Set<String> validateBlobDescriptorFields(
-            RowType rowType, CoreOptions options, Set<String> blobFields) {
+    private static Set<String> validateBlobDescriptorFields(RowType rowType, 
CoreOptions options) {
         Set<String> blobFieldNames =
                 rowType.getFields().stream()
                         .filter(field -> field.type().getTypeRoot() == 
DataTypeRoot.BLOB)
@@ -752,21 +749,12 @@ public class SchemaValidation {
                     "Field '%s' in '%s' must be a BLOB field in table schema.",
                     field,
                     CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
-            checkArgument(
-                    blobFields.contains(field),
-                    "Field '%s' in '%s' must also be in '%s'.",
-                    field,
-                    CoreOptions.BLOB_DESCRIPTOR_FIELD.key(),
-                    CoreOptions.BLOB_FIELD.key());
         }
         return configured;
     }
 
     private static Set<String> validateBlobViewFields(
-            RowType rowType,
-            CoreOptions options,
-            Set<String> blobFields,
-            Set<String> blobDescriptorFields) {
+            RowType rowType, CoreOptions options, Set<String> 
blobDescriptorFields) {
         Set<String> blobFieldNames =
                 rowType.getFields().stream()
                         .filter(field -> field.type().getTypeRoot() == 
DataTypeRoot.BLOB)
@@ -779,12 +767,6 @@ public class SchemaValidation {
                     "Field '%s' in '%s' must be a BLOB field in table schema.",
                     field,
                     CoreOptions.BLOB_VIEW_FIELD.key());
-            checkArgument(
-                    blobFields.contains(field),
-                    "Field '%s' in '%s' must also be in '%s'.",
-                    field,
-                    CoreOptions.BLOB_VIEW_FIELD.key(),
-                    CoreOptions.BLOB_FIELD.key());
             checkArgument(
                     !blobDescriptorFields.contains(field),
                     "Field '%s' in '%s' can not also be in '%s'.",
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index b86e43be48..38aabeda6a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -51,6 +51,7 @@ import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.system.RowTrackingTable;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Range;
@@ -497,49 +498,32 @@ public class BlobTableTest extends TableTestBase {
     }
 
     @Test
-    public void testBlobViewFieldMustBeSubsetOfBlobField() {
-        assertThatThrownBy(
-                        () -> {
-                            Schema.Builder schemaBuilder = Schema.newBuilder();
-                            schemaBuilder.column("f0", DataTypes.INT());
-                            schemaBuilder.column("f1", DataTypes.STRING());
-                            schemaBuilder.column("f2", DataTypes.BLOB());
-                            
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
-                            
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
-                            
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
-                            
schemaBuilder.option(CoreOptions.BLOB_VIEW_FIELD.key(), "f2");
-                            catalog.createTable(identifier(), 
schemaBuilder.build(), true);
-                        })
-                .hasRootCauseInstanceOf(IllegalArgumentException.class)
-                .hasRootCauseMessage(
-                        "Field 'f2' in '"
-                                + CoreOptions.BLOB_VIEW_FIELD.key()
-                                + "' must also be in '"
-                                + CoreOptions.BLOB_FIELD.key()
-                                + "'.");
+    public void testBlobInlineFieldCanDeclareBlobWithoutBlobField() throws 
Exception {
+        assertCreateBlobInlineFieldWithoutBlobField(
+                "blob_descriptor_without_blob_field", 
CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
+        assertCreateBlobInlineFieldWithoutBlobField(
+                "blob_view_without_blob_field", 
CoreOptions.BLOB_VIEW_FIELD.key());
     }
 
-    @Test
-    public void testBlobDescriptorFieldMustBeSubsetOfBlobField() {
-        assertThatThrownBy(
-                        () -> {
-                            Schema.Builder schemaBuilder = Schema.newBuilder();
-                            schemaBuilder.column("f0", DataTypes.INT());
-                            schemaBuilder.column("f1", DataTypes.STRING());
-                            schemaBuilder.column("f2", DataTypes.BLOB());
-                            
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
-                            
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
-                            
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
-                            
schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f2");
-                            catalog.createTable(identifier(), 
schemaBuilder.build(), true);
-                        })
-                .hasRootCauseInstanceOf(IllegalArgumentException.class)
-                .hasRootCauseMessage(
-                        "Field 'f2' in '"
-                                + CoreOptions.BLOB_DESCRIPTOR_FIELD.key()
-                                + "' must also be in '"
-                                + CoreOptions.BLOB_FIELD.key()
-                                + "'.");
+    private void assertCreateBlobInlineFieldWithoutBlobField(String tableName, 
String optionKey)
+            throws Exception {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.BLOB());
+        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        schemaBuilder.option(optionKey, "f2");
+
+        catalog.createTable(identifier(tableName), schemaBuilder.build(), 
true);
+
+        assertThat(
+                        catalog.getTable(identifier(tableName))
+                                .rowType()
+                                .getTypeAt(2)
+                                .is(DataTypeRoot.BLOB))
+                .isTrue();
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index df225feba6..cae85c2386 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -129,6 +129,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -1091,11 +1092,7 @@ public class FlinkCatalog extends AbstractCatalog {
 
         Map<String, String> options = new HashMap<>(catalogTable.getOptions());
         List<String> blobFields = CoreOptions.blobField(options);
-        Set<String> blobDescriptorFields = new 
CoreOptions(options).blobDescriptorField();
-        List<String> blobViewFields = CoreOptions.blobViewField(options);
-        validateSecondaryBlobFields(
-                blobFields, blobDescriptorFields, 
CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
-        validateSecondaryBlobFields(blobFields, blobViewFields, 
CoreOptions.BLOB_VIEW_FIELD.key());
+        Set<String> blobTypeFields = blobTypeFields(options);
         if (!blobFields.isEmpty()) {
             checkArgument(
                     
options.containsKey(CoreOptions.DATA_EVOLUTION_ENABLED.key()),
@@ -1124,30 +1121,29 @@ public class FlinkCatalog extends AbstractCatalog {
                         field ->
                                 schemaBuilder.column(
                                         field.getName(),
-                                        resolveDataType(field.getName(), 
field.getType(), options),
+                                        resolveDataType(
+                                                field.getName(),
+                                                field.getType(),
+                                                options,
+                                                blobTypeFields),
                                         columnComments.get(field.getName())));
 
         return schemaBuilder.build();
     }
 
-    private static void validateSecondaryBlobFields(
-            List<String> blobFields, Iterable<String> secondaryBlobFields, 
String optionKey) {
-        for (String secondaryBlobField : secondaryBlobFields) {
-            checkArgument(
-                    blobFields.contains(secondaryBlobField),
-                    "Field '%s' in '%s' must also be in '%s'.",
-                    secondaryBlobField,
-                    optionKey,
-                    CoreOptions.BLOB_FIELD.key());
-        }
+    private static Set<String> blobTypeFields(Map<String, String> options) {
+        Set<String> blobTypeFields = new 
HashSet<>(CoreOptions.blobField(options));
+        blobTypeFields.addAll(new CoreOptions(options).blobDescriptorField());
+        blobTypeFields.addAll(CoreOptions.blobViewField(options));
+        return blobTypeFields;
     }
 
     private static org.apache.paimon.types.DataType resolveDataType(
             String fieldName,
             org.apache.flink.table.types.logical.LogicalType logicalType,
-            Map<String, String> options) {
-        List<String> blobFields = CoreOptions.blobField(options);
-        if (blobFields.contains(fieldName)) {
+            Map<String, String> options,
+            Set<String> blobTypeFields) {
+        if (blobTypeFields.contains(fieldName)) {
             return toBlobType(logicalType);
         }
         Set<String> vectorFields = CoreOptions.vectorField(options);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index 01e91542c6..1b56bdd74d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -27,6 +27,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.rest.TestHttpWebServer;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.UriReader;
 import org.apache.paimon.utils.UriReaderFactory;
@@ -191,7 +192,6 @@ public class BlobTableITCase extends CatalogITCaseBase {
                 "CREATE TABLE downstream_blob_view (id INT, label STRING, 
image_ref BYTES)"
                         + " WITH ('row-tracking.enabled'='true',"
                         + " 'data-evolution.enabled'='true',"
-                        + " 'blob-field'='image_ref',"
                         + " 'blob-view-field'='image_ref')");
 
         batchSql(
@@ -251,26 +251,24 @@ public class BlobTableITCase extends CatalogITCaseBase {
     }
 
     @Test
-    public void testBlobInlineFieldRequiresBlobField() {
-        assertSecondaryBlobFieldRequiresBlobField(
+    public void testBlobInlineFieldCanDeclareBlobWithoutBlobField() throws 
Exception {
+        assertSecondaryBlobFieldCanDeclareBlobWithoutBlobField(
                 "blob_descriptor_without_blob_field", "blob-descriptor-field");
-        assertSecondaryBlobFieldRequiresBlobField(
+        assertSecondaryBlobFieldCanDeclareBlobWithoutBlobField(
                 "blob_view_without_blob_field", "blob-view-field");
     }
 
-    private void assertSecondaryBlobFieldRequiresBlobField(String tableName, 
String optionKey) {
-        assertThatThrownBy(
-                        () ->
-                                tEnv.executeSql(
-                                        String.format(
-                                                "CREATE TABLE %s (id INT, 
picture BYTES)"
-                                                        + " WITH 
('row-tracking.enabled'='true',"
-                                                        + " 
'data-evolution.enabled'='true',"
-                                                        + " '%s'='picture')",
-                                                tableName, optionKey)))
-                .hasRootCauseInstanceOf(IllegalArgumentException.class)
-                .hasRootCauseMessage(
-                        "Field 'picture' in '" + optionKey + "' must also be 
in 'blob-field'.");
+    private void assertSecondaryBlobFieldCanDeclareBlobWithoutBlobField(
+            String tableName, String optionKey) throws Exception {
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE %s (id INT, picture BYTES)"
+                                + " WITH ('row-tracking.enabled'='true',"
+                                + " 'data-evolution.enabled'='true',"
+                                + " '%s'='picture')",
+                        tableName, optionKey));
+
+        
assertThat(paimonTable(tableName).rowType().getTypeAt(1).is(DataTypeRoot.BLOB)).isTrue();
     }
 
     @Test
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 3ab8406931..c68e7768ab 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -83,6 +83,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
@@ -457,6 +458,7 @@ public class SparkCatalog extends SparkBaseCatalog
             StructType schema, Transform[] partitions, Map<String, String> 
properties) {
         Map<String, String> normalizedProperties = new HashMap<>(properties);
         List<String> blobFields = CoreOptions.blobField(properties);
+        Set<String> blobDescriptorFields = new 
CoreOptions(properties).blobDescriptorField();
         List<String> blobViewFields = CoreOptions.blobViewField(properties);
         String provider = properties.get(TableCatalog.PROP_PROVIDER);
         if (!usePaimon(provider)) {
@@ -491,7 +493,9 @@ public class SparkCatalog extends SparkBaseCatalog
         for (StructField field : schema.fields()) {
             String name = field.name();
             DataType type;
-            if (blobFields.contains(name) || blobViewFields.contains(name)) {
+            if (blobFields.contains(name)
+                    || blobDescriptorFields.contains(name)
+                    || blobViewFields.contains(name)) {
                 checkArgument(
                         field.dataType() instanceof 
org.apache.spark.sql.types.BinaryType,
                         "The type of blob field must be binary");
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index 2ff6eb308d..38d9793d2d 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -399,7 +399,7 @@ class BlobTestBase extends PaimonSparkTestBase {
       sql(
         "CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES " +
           "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', " +
-          "'blob-field'='picture', 'blob-descriptor-field'='picture')")
+          "'blob-descriptor-field'='picture')")
 
       // Insert with a descriptor pointing to a real file
       val blobData = new Array[Byte](256)

Reply via email to