This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 175c9506da [core] Support catalog-aware blob view function (#7786)
175c9506da is described below

commit 175c9506da3a2fd3a6936d797f83233ec8d75d06
Author: YeJunHao <[email protected]>
AuthorDate: Sat May 9 12:58:06 2026 +0800

    [core] Support catalog-aware blob view function (#7786)
---
 docs/content/append-table/blob.md                  |  86 +++++++++++-
 .../java/org/apache/paimon/flink/FlinkCatalog.java |  53 +++++++
 .../paimon/flink/function/BlobViewFunction.java    | 152 ++++++++++++++++++++-
 ...ViewFunction.java => CatalogAwareFunction.java} |  16 +--
 .../org/apache/paimon/flink/BlobTableITCase.java   |  48 +++++--
 5 files changed, 331 insertions(+), 24 deletions(-)

diff --git a/docs/content/append-table/blob.md 
b/docs/content/append-table/blob.md
index 8180fdf28e..73124964f1 100644
--- a/docs/content/append-table/blob.md
+++ b/docs/content/append-table/blob.md
@@ -71,7 +71,7 @@ For details about the blob file format structure, see [File 
Format - BLOB]({{< r
 
 ## Storage Modes
 
-Paimon supports three storage modes for BLOB fields:
+Paimon supports four storage modes for BLOB fields:
 
 1. **Default blob storage**
    Blob bytes are written to Paimon-managed `.blob` files under the table path.
@@ -82,7 +82,10 @@ Paimon supports three storage modes for BLOB fields:
 3. **External-storage descriptor mode**
    Fields configured in `blob-external-storage-field` are a subset of 
`blob-descriptor-field`. At write time, Paimon writes the raw blob data to the 
configured `blob-external-storage-path` and stores only serialized 
`BlobDescriptor` bytes inline in data files.
 
-This allows one table to mix raw-data BLOB fields, descriptor-only BLOB 
fields, and descriptor-based BLOB fields backed by external storage.
+4. **Blob view storage**
+   Fields configured in `blob-view-field` store serialized `BlobViewStruct` 
bytes inline in data files. The struct points to a BLOB value in an upstream 
table by table identifier, BLOB field, and row id. The actual blob bytes are 
resolved from the upstream table at read time.
+
+This allows one table to mix raw-data BLOB fields, descriptor-only BLOB 
fields, descriptor-based BLOB fields backed by external storage, and view 
fields that reference upstream BLOB values.
 
 ## Table Options
 
@@ -123,6 +126,17 @@ This allows one table to mix raw-data BLOB fields, 
descriptor-only BLOB fields,
         some BLOB fields in <code>.blob</code> files and some as descriptor 
references.
       </td>
     </tr>
+    <tr>
+      <td><h5>blob-view-field</h5></td>
+      <td>No</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>
+        Comma-separated BLOB field names stored as serialized 
<code>BlobViewStruct</code> bytes inline in normal data files.
+        The field values reference BLOB values in upstream tables and are 
resolved at read time.
+        This option must be a subset of <code>blob-field</code> and must not 
overlap with <code>blob-descriptor-field</code>.
+      </td>
+    </tr>
     <tr>
       <td><h5>blob-external-storage-field</h5></td>
       <td>No</td>
@@ -279,6 +293,69 @@ ALTER TABLE blob_table SET ('blob-as-descriptor' = 
'false');
 SELECT image FROM blob_table;
 ```
 
+### Blob View
+
+Blob view is useful when a downstream table should reference BLOB values 
already stored in an upstream table, without copying the bytes or creating new 
`.blob` files. A blob view field stores only a small `BlobViewStruct` inline. 
When the field is read, Paimon resolves the referenced BLOB from the upstream 
table.
+
+Blob view requires:
+
+- the upstream table to have row tracking enabled, so each row has a stable 
`_ROW_ID`
+- the downstream field to be listed in both `blob-field` and `blob-view-field`
+- writes to provide a serialized `BlobViewStruct`; in Flink SQL, use the 
built-in `sys.blob_view` function
+
+The Flink SQL function signature is:
+
+```sql
+sys.blob_view(table_name, field_name, row_id)
+```
+
+Arguments:
+
+- `table_name`: the upstream table name. It must be fully qualified as 
`database.table` or `catalog.database.table`. Unqualified table names are 
rejected.
+- `field_name`: the upstream BLOB field name.
+- `row_id`: the `_ROW_ID` value from the upstream row-tracking table.
+
+The following example writes a downstream table whose `image_ref` field views 
the `image` field in `image_table`:
+
+```sql
+CREATE TABLE image_table (
+    id INT,
+    name STRING,
+    image BYTES
+) WITH (
+    'row-tracking.enabled' = 'true',
+    'data-evolution.enabled' = 'true',
+    'blob-field' = 'image'
+);
+
+CREATE TABLE image_view_table (
+    id INT,
+    label STRING,
+    image_ref BYTES
+) WITH (
+    'row-tracking.enabled' = 'true',
+    'data-evolution.enabled' = 'true',
+    'blob-field' = 'image_ref',
+    'blob-view-field' = 'image_ref'
+);
+
+INSERT INTO image_view_table
+SELECT
+    id,
+    name AS label,
+    sys.blob_view('default.image_table', 'image', _ROW_ID)
+FROM `image_table$row_tracking`;
+```
+
+If the current Paimon catalog name is included in the table name, the function 
also accepts `catalog.database.table`:
+
+```sql
+SELECT sys.blob_view('my_catalog.default.image_table', 'image', _ROW_ID)
+FROM `image_table$row_tracking`;
+```
+
+Reads from `image_view_table.image_ref` return the referenced BLOB bytes in 
the same way as normal blob fields. The referenced upstream table and row must 
remain available for the view to be resolved.
+
 ### MERGE INTO Support
 
 For Data Evolution writes in Flink and Spark:
@@ -643,6 +720,7 @@ For these configured fields:
 3. **No Statistics**: Statistics collection is not supported for blob columns.
 4. **Required Options**: `row-tracking.enabled` and `data-evolution.enabled` 
must be set to `true`.
 5. **External Storage Cleanup**: Files written through 
`blob-external-storage-path` are outside Paimon's orphan file cleanup scope.
+6. **Blob View Dependency**: Blob view fields depend on the referenced 
upstream table and row. If the upstream data is removed or no longer readable, 
the view cannot be resolved.
 
 ## Best Practices
 
@@ -656,4 +734,6 @@ For these configured fields:
 
 5. **Manage External Storage Lifecycle Separately**: Files written to 
`blob-external-storage-path` are not cleaned up by Paimon, so retention and 
deletion should be managed externally.
 
-6. **Use Partitioning**: Partition your blob tables by date or other 
dimensions to improve query performance and data management.
+6. **Use Blob View to Avoid Copying BLOB Data**: Configure `blob-view-field` 
when a downstream table only needs to reference BLOB values from an upstream 
table.
+
+7. **Use Partitioning**: Partition your blob tables by date or other 
dimensions to improve query performance and data management.
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 8fb3ccf918..df225feba6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -26,6 +26,7 @@ import org.apache.paimon.catalog.Database;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PropertyChange;
 import org.apache.paimon.flink.function.BuiltInFunctions;
+import org.apache.paimon.flink.function.CatalogAwareFunction;
 import org.apache.paimon.flink.procedure.ProcedureUtil;
 import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
 import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
@@ -56,6 +57,7 @@ import org.apache.paimon.view.ViewImpl;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
@@ -111,6 +113,9 @@ import 
org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.procedures.Procedure;
 import org.apache.flink.table.resource.ResourceType;
 import org.apache.flink.table.resource.ResourceUri;
@@ -184,6 +189,7 @@ public class FlinkCatalog extends AbstractCatalog {
 
     private final Catalog catalog;
     private final String name;
+    private final Options options;
 
     private final boolean disableCreateTableInDefaultDatabase;
 
@@ -192,6 +198,7 @@ public class FlinkCatalog extends AbstractCatalog {
         LOG.info("Creating Flink catalog: metastore={}", 
options.get(CatalogOptions.METASTORE));
         this.catalog = catalog;
         this.name = name;
+        this.options = new Options(options.toMap());
         this.disableCreateTableInDefaultDatabase = 
options.get(DISABLE_CREATE_TABLE_IN_DEFAULT_DB);
         if (!disableCreateTableInDefaultDatabase) {
             try {
@@ -214,6 +221,52 @@ public class FlinkCatalog extends AbstractCatalog {
         return Optional.of(new FlinkTableFactory(this));
     }
 
+    @Override
+    public Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
+        return Optional.of(
+                new CatalogAwareFunctionDefinitionFactory(name, 
getDefaultDatabase(), options));
+    }
+
+    private static class CatalogAwareFunctionDefinitionFactory
+            implements FunctionDefinitionFactory {
+
+        private final String catalogName;
+        private final String defaultDatabase;
+        private final Options catalogOptions;
+
+        private CatalogAwareFunctionDefinitionFactory(
+                String catalogName, String defaultDatabase, Options 
catalogOptions) {
+            this.catalogName = catalogName;
+            this.defaultDatabase = defaultDatabase;
+            this.catalogOptions = new Options(catalogOptions.toMap());
+        }
+
+        @SuppressWarnings("deprecation")
+        public org.apache.flink.table.functions.FunctionDefinition 
createFunctionDefinition(
+                String functionName, CatalogFunction catalogFunction) {
+            return createFunctionDefinition(functionName, catalogFunction, 
null);
+        }
+
+        @Override
+        public org.apache.flink.table.functions.FunctionDefinition 
createFunctionDefinition(
+                String functionName,
+                CatalogFunction catalogFunction,
+                FunctionDefinitionFactory.Context context) {
+            ClassLoader classLoader =
+                    context == null
+                            ? Thread.currentThread().getContextClassLoader()
+                            : context.getClassLoader();
+            UserDefinedFunction function =
+                    UserDefinedFunctionHelper.instantiateFunction(
+                            classLoader, new Configuration(), functionName, 
catalogFunction);
+            if (function instanceof CatalogAwareFunction) {
+                ((CatalogAwareFunction) function)
+                        .setCatalogContext(catalogName, defaultDatabase, 
catalogOptions);
+            }
+            return function;
+        }
+    }
+
     @Override
     public List<String> listDatabases() throws CatalogException {
         return catalog.listDatabases();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
index 887bda71f8..0f9b4b33ab 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
@@ -18,13 +18,55 @@
 
 package org.apache.paimon.flink.function;
 
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BlobViewStruct;
+import org.apache.paimon.flink.FlinkFileIOLoader;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypeRoot;
 
+import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.functions.ScalarFunction;
 
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
 /** Flink scalar function that constructs a serialized {@link BlobViewStruct}. 
*/
-public class BlobViewFunction extends ScalarFunction {
+public class BlobViewFunction extends ScalarFunction implements 
CatalogAwareFunction {
+
+    @Nullable private String catalogName;
+    @Nullable private Options catalogOptions;
+
+    private transient Catalog catalog;
+    private transient Map<String, Map<String, BlobViewField>> 
blobViewFieldCache;
+
+    @Override
+    public void setCatalogContext(
+            String catalogName, String defaultDatabase, Options 
catalogOptions) {
+        this.catalogName = catalogName;
+        this.catalogOptions = new Options(catalogOptions.toMap());
+        this.catalog = null;
+        this.blobViewFieldCache = null;
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        openCatalog(context.getUserCodeClassLoader());
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (catalog != null) {
+            catalog.close();
+            catalog = null;
+        }
+    }
 
     public byte[] eval(String identifier, int fieldId, long rowId) {
         if (identifier == null) {
@@ -32,4 +74,112 @@ public class BlobViewFunction extends ScalarFunction {
         }
         return new BlobViewStruct(Identifier.fromString(identifier), fieldId, 
rowId).serialize();
     }
+
+    public byte[] eval(String tableName, String fieldName, long rowId) {
+        if (tableName == null || fieldName == null) {
+            return null;
+        }
+
+        BlobViewField field = blobViewField(tableName, fieldName);
+        return new BlobViewStruct(field.tableName, field.fieldId, 
rowId).serialize();
+    }
+
+    private BlobViewField blobViewField(String tableName, String fieldName) {
+        Map<String, BlobViewField> tableCache = 
blobViewFieldCache().get(tableName);
+        if (tableCache != null) {
+            BlobViewField cached = tableCache.get(fieldName);
+            if (cached != null) {
+                return cached;
+            }
+        }
+
+        Identifier identifier = toIdentifier(tableName);
+        BlobViewField field = new BlobViewField(identifier, 
fieldId(identifier, fieldName));
+        if (tableCache == null) {
+            tableCache = new HashMap<>();
+            blobViewFieldCache().put(tableName, tableCache);
+        }
+        tableCache.put(fieldName, field);
+        return field;
+    }
+
+    private int fieldId(Identifier identifier, String fieldName) {
+        try {
+            Table table = catalog().getTable(identifier);
+            if (!table.rowType().containsField(fieldName)) {
+                throw new IllegalArgumentException(
+                        "Cannot find blob field "
+                                + fieldName
+                                + " in upstream table "
+                                + identifier.getFullName()
+                                + ".");
+            }
+            DataField field = table.rowType().getField(fieldName);
+            if (!field.type().is(DataTypeRoot.BLOB)) {
+                throw new IllegalArgumentException(
+                        "Field "
+                                + fieldName
+                                + " in upstream table "
+                                + identifier.getFullName()
+                                + " is not a BLOB field.");
+            }
+            return field.id();
+        } catch (Catalog.TableNotExistException e) {
+            throw new IllegalArgumentException(
+                    "Cannot find upstream table " + identifier.getFullName() + 
".", e);
+        }
+    }
+
+    private Identifier toIdentifier(String tableName) {
+        String[] parts = tableName.split("\\.", 3);
+        if (parts.length == 2) {
+            return new Identifier(parts[0], parts[1]);
+        }
+        if (catalogName != null && catalogName.equals(parts[0])) {
+            return new Identifier(parts[1], parts[2]);
+        }
+        throw new IllegalArgumentException(
+                "Table name must be 'database.table' or '"
+                        + catalogName
+                        + ".database.table', but is '"
+                        + tableName
+                        + "'.");
+    }
+
+    private Catalog catalog() {
+        if (catalog == null) {
+            openCatalog(Thread.currentThread().getContextClassLoader());
+        }
+        return catalog;
+    }
+
+    private Map<String, Map<String, BlobViewField>> blobViewFieldCache() {
+        if (blobViewFieldCache == null) {
+            blobViewFieldCache = new HashMap<>();
+        }
+        return blobViewFieldCache;
+    }
+
+    private void openCatalog(ClassLoader classLoader) {
+        if (catalog != null) {
+            return;
+        }
+        if (catalogOptions == null) {
+            throw new IllegalStateException("BlobViewFunction is missing 
catalog options.");
+        }
+        catalog =
+                CatalogFactory.createCatalog(
+                        CatalogContext.create(catalogOptions, new 
FlinkFileIOLoader()),
+                        classLoader);
+    }
+
+    private static class BlobViewField {
+        private final Identifier tableName;
+        private final int fieldId;
+
+        private BlobViewField(Identifier tableName, int fieldId) {
+            this.tableName = tableName;
+            this.fieldId = fieldId;
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/CatalogAwareFunction.java
similarity index 61%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/CatalogAwareFunction.java
index 887bda71f8..4a810ec887 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/CatalogAwareFunction.java
@@ -18,18 +18,10 @@
 
 package org.apache.paimon.flink.function;
 
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BlobViewStruct;
+import org.apache.paimon.options.Options;
 
-import org.apache.flink.table.functions.ScalarFunction;
+/** Interface for Flink functions that need Paimon catalog context. */
+public interface CatalogAwareFunction {
 
-/** Flink scalar function that constructs a serialized {@link BlobViewStruct}. 
*/
-public class BlobViewFunction extends ScalarFunction {
-
-    public byte[] eval(String identifier, int fieldId, long rowId) {
-        if (identifier == null) {
-            return null;
-        }
-        return new BlobViewStruct(Identifier.fromString(identifier), fieldId, 
rowId).serialize();
-    }
+    void setCatalogContext(String catalogName, String defaultDatabase, Options 
catalogOptions);
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index 5ff2e133a4..01e91542c6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -185,12 +185,6 @@ public class BlobTableITCase extends CatalogITCaseBase {
         batchSql("INSERT INTO upstream_blob_view VALUES (1, 'row1', 
X'48656C6C6F')");
         batchSql("INSERT INTO upstream_blob_view VALUES (2, 'row2', X'5945')");
 
-        int pictureFieldId =
-                
paimonTable("upstream_blob_view").rowType().getFields().stream()
-                        .filter(field -> field.name().equals("picture"))
-                        .findFirst()
-                        .orElseThrow(() -> new RuntimeException("picture field 
not found"))
-                        .id();
         String fullTableName = tEnv.getCurrentDatabase() + 
".upstream_blob_view";
 
         tEnv.executeSql(
@@ -203,9 +197,9 @@ public class BlobTableITCase extends CatalogITCaseBase {
         batchSql(
                 String.format(
                         "INSERT INTO downstream_blob_view"
-                                + " SELECT id, name, sys.blob_view('%s', %d, 
_ROW_ID)"
+                                + " SELECT id, name, sys.blob_view('%s', 
'picture', _ROW_ID)"
                                 + " FROM `upstream_blob_view$row_tracking`",
-                        fullTableName, pictureFieldId));
+                        fullTableName));
 
         List<Row> result = batchSql("SELECT * FROM downstream_blob_view ORDER 
BY id");
         assertThat(result).hasSize(2);
@@ -218,6 +212,44 @@ public class BlobTableITCase extends CatalogITCaseBase {
         assertThat((byte[]) result.get(1).getField(2)).isEqualTo(new byte[] 
{89, 69});
     }
 
+    @Test
+    public void testBlobViewRejectsUnqualifiedTableName() {
+        assertThatThrownBy(
+                        () ->
+                                batchSql(
+                                        "SELECT sys.blob_view("
+                                                + "'upstream_blob_view', "
+                                                + "'picture', "
+                                                + "CAST(0 AS BIGINT))"))
+                .hasRootCauseInstanceOf(IllegalArgumentException.class)
+                .hasRootCauseMessage(
+                        "Table name must be 'database.table' or 
'PAIMON.database.table', "
+                                + "but is 'upstream_blob_view'.");
+    }
+
+    @Test
+    public void testBlobViewRejectsNonBlobField() {
+        tEnv.executeSql(
+                "CREATE TABLE upstream_non_blob (id INT, picture BYTES)"
+                        + " WITH ('row-tracking.enabled'='true',"
+                        + " 'data-evolution.enabled'='true')");
+
+        String fullTableName = tEnv.getCurrentDatabase() + 
".upstream_non_blob";
+        assertThatThrownBy(
+                        () ->
+                                batchSql(
+                                        "SELECT sys.blob_view("
+                                                + "'%s', "
+                                                + "'picture', "
+                                                + "CAST(0 AS BIGINT))",
+                                        fullTableName))
+                .hasRootCauseInstanceOf(IllegalArgumentException.class)
+                .hasRootCauseMessage(
+                        "Field picture in upstream table "
+                                + fullTableName
+                                + " is not a BLOB field.");
+    }
+
     @Test
     public void testBlobInlineFieldRequiresBlobField() {
         assertSecondaryBlobFieldRequiresBlobField(

Reply via email to