This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 175c9506da [core] Support catalog-aware blob view function (#7786)
175c9506da is described below
commit 175c9506da3a2fd3a6936d797f83233ec8d75d06
Author: YeJunHao <[email protected]>
AuthorDate: Sat May 9 12:58:06 2026 +0800
[core] Support catalog-aware blob view function (#7786)
---
docs/content/append-table/blob.md | 86 +++++++++++-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 53 +++++++
.../paimon/flink/function/BlobViewFunction.java | 152 ++++++++++++++++++++-
...ViewFunction.java => CatalogAwareFunction.java} | 16 +--
.../org/apache/paimon/flink/BlobTableITCase.java | 48 +++++--
5 files changed, 331 insertions(+), 24 deletions(-)
diff --git a/docs/content/append-table/blob.md
b/docs/content/append-table/blob.md
index 8180fdf28e..73124964f1 100644
--- a/docs/content/append-table/blob.md
+++ b/docs/content/append-table/blob.md
@@ -71,7 +71,7 @@ For details about the blob file format structure, see [File
Format - BLOB]({{< r
## Storage Modes
-Paimon supports three storage modes for BLOB fields:
+Paimon supports four storage modes for BLOB fields:
1. **Default blob storage**
Blob bytes are written to Paimon-managed `.blob` files under the table path.
@@ -82,7 +82,10 @@ Paimon supports three storage modes for BLOB fields:
3. **External-storage descriptor mode**
Fields configured in `blob-external-storage-field` are a subset of
`blob-descriptor-field`. At write time, Paimon writes the raw blob data to the
configured `blob-external-storage-path` and stores only serialized
`BlobDescriptor` bytes inline in data files.
-This allows one table to mix raw-data BLOB fields, descriptor-only BLOB
fields, and descriptor-based BLOB fields backed by external storage.
+4. **Blob view storage**
+ Fields configured in `blob-view-field` store serialized `BlobViewStruct`
bytes inline in data files. The struct points to a BLOB value in an upstream
table by table identifier, BLOB field, and row id. The actual blob bytes are
resolved from the upstream table at read time.
+
+This allows one table to mix raw-data BLOB fields, descriptor-only BLOB
fields, descriptor-based BLOB fields backed by external storage, and view
fields that reference upstream BLOB values.
## Table Options
@@ -123,6 +126,17 @@ This allows one table to mix raw-data BLOB fields,
descriptor-only BLOB fields,
some BLOB fields in <code>.blob</code> files and some as descriptor
references.
</td>
</tr>
+ <tr>
+ <td><h5>blob-view-field</h5></td>
+ <td>No</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>
+ Comma-separated BLOB field names stored as serialized
<code>BlobViewStruct</code> bytes inline in normal data files.
+ The field values reference BLOB values in upstream tables and are
resolved at read time.
+ This option must be a subset of <code>blob-field</code> and must not
overlap with <code>blob-descriptor-field</code>.
+ </td>
+ </tr>
<tr>
<td><h5>blob-external-storage-field</h5></td>
<td>No</td>
@@ -279,6 +293,69 @@ ALTER TABLE blob_table SET ('blob-as-descriptor' =
'false');
SELECT image FROM blob_table;
```
+### Blob View
+
+Blob view is useful when a downstream table should reference BLOB values
already stored in an upstream table, without copying the bytes or creating new
`.blob` files. A blob view field stores only a small `BlobViewStruct` inline.
When the field is read, Paimon resolves the referenced BLOB from the upstream
table.
+
+Blob view requires:
+
+- the upstream table to have row tracking enabled, so each row has a stable
`_ROW_ID`
+- the downstream field to be listed in both `blob-field` and `blob-view-field`
+- writes to provide a serialized `BlobViewStruct`; in Flink SQL, use the
built-in `sys.blob_view` function
+
+The Flink SQL function signature is:
+
+```sql
+sys.blob_view(table_name, field_name, row_id)
+```
+
+Arguments:
+
+- `table_name`: the upstream table name. It must be fully qualified as
`database.table` or `catalog.database.table`. Unqualified table names are
rejected.
+- `field_name`: the upstream BLOB field name.
+- `row_id`: the `_ROW_ID` value from the upstream row-tracking table.
+
+The following example writes a downstream table whose `image_ref` field views
the `image` field in `image_table`:
+
+```sql
+CREATE TABLE image_table (
+ id INT,
+ name STRING,
+ image BYTES
+) WITH (
+ 'row-tracking.enabled' = 'true',
+ 'data-evolution.enabled' = 'true',
+ 'blob-field' = 'image'
+);
+
+CREATE TABLE image_view_table (
+ id INT,
+ label STRING,
+ image_ref BYTES
+) WITH (
+ 'row-tracking.enabled' = 'true',
+ 'data-evolution.enabled' = 'true',
+ 'blob-field' = 'image_ref',
+ 'blob-view-field' = 'image_ref'
+);
+
+INSERT INTO image_view_table
+SELECT
+ id,
+ name AS label,
+ sys.blob_view('default.image_table', 'image', _ROW_ID)
+FROM `image_table$row_tracking`;
+```
+
+If the current Paimon catalog name is included in the table name, the function
also accepts `catalog.database.table`:
+
+```sql
+SELECT sys.blob_view('my_catalog.default.image_table', 'image', _ROW_ID)
+FROM `image_table$row_tracking`;
+```
+
+Reads from `image_view_table.image_ref` return the referenced BLOB bytes in
the same way as normal blob fields. The referenced upstream table and row must
remain available for the view to be resolved.
+
### MERGE INTO Support
For Data Evolution writes in Flink and Spark:
@@ -643,6 +720,7 @@ For these configured fields:
3. **No Statistics**: Statistics collection is not supported for blob columns.
4. **Required Options**: `row-tracking.enabled` and `data-evolution.enabled`
must be set to `true`.
5. **External Storage Cleanup**: Files written through
`blob-external-storage-path` are outside Paimon's orphan file cleanup scope.
+6. **Blob View Dependency**: Blob view fields depend on the referenced
upstream table and row. If the upstream data is removed or no longer readable,
the view cannot be resolved.
## Best Practices
@@ -656,4 +734,6 @@ For these configured fields:
5. **Manage External Storage Lifecycle Separately**: Files written to
`blob-external-storage-path` are not cleaned up by Paimon, so retention and
deletion should be managed externally.
-6. **Use Partitioning**: Partition your blob tables by date or other
dimensions to improve query performance and data management.
+6. **Use Blob View to Avoid Copying BLOB Data**: Configure `blob-view-field`
when a downstream table only needs to reference BLOB values from an upstream
table.
+
+7. **Use Partitioning**: Partition your blob tables by date or other
dimensions to improve query performance and data management.
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 8fb3ccf918..df225feba6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -26,6 +26,7 @@ import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.flink.function.BuiltInFunctions;
+import org.apache.paimon.flink.function.CatalogAwareFunction;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
@@ -56,6 +57,7 @@ import org.apache.paimon.view.ViewImpl;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
@@ -111,6 +113,9 @@ import
org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.procedures.Procedure;
import org.apache.flink.table.resource.ResourceType;
import org.apache.flink.table.resource.ResourceUri;
@@ -184,6 +189,7 @@ public class FlinkCatalog extends AbstractCatalog {
private final Catalog catalog;
private final String name;
+ private final Options options;
private final boolean disableCreateTableInDefaultDatabase;
@@ -192,6 +198,7 @@ public class FlinkCatalog extends AbstractCatalog {
LOG.info("Creating Flink catalog: metastore={}",
options.get(CatalogOptions.METASTORE));
this.catalog = catalog;
this.name = name;
+ this.options = new Options(options.toMap());
this.disableCreateTableInDefaultDatabase =
options.get(DISABLE_CREATE_TABLE_IN_DEFAULT_DB);
if (!disableCreateTableInDefaultDatabase) {
try {
@@ -214,6 +221,52 @@ public class FlinkCatalog extends AbstractCatalog {
return Optional.of(new FlinkTableFactory(this));
}
+ @Override
+ public Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
+ return Optional.of(
+ new CatalogAwareFunctionDefinitionFactory(name,
getDefaultDatabase(), options));
+ }
+
+ private static class CatalogAwareFunctionDefinitionFactory
+ implements FunctionDefinitionFactory {
+
+ private final String catalogName;
+ private final String defaultDatabase;
+ private final Options catalogOptions;
+
+ private CatalogAwareFunctionDefinitionFactory(
+ String catalogName, String defaultDatabase, Options
catalogOptions) {
+ this.catalogName = catalogName;
+ this.defaultDatabase = defaultDatabase;
+ this.catalogOptions = new Options(catalogOptions.toMap());
+ }
+
+ @SuppressWarnings("deprecation")
+ public org.apache.flink.table.functions.FunctionDefinition
createFunctionDefinition(
+ String functionName, CatalogFunction catalogFunction) {
+ return createFunctionDefinition(functionName, catalogFunction,
null);
+ }
+
+ @Override
+ public org.apache.flink.table.functions.FunctionDefinition
createFunctionDefinition(
+ String functionName,
+ CatalogFunction catalogFunction,
+ FunctionDefinitionFactory.Context context) {
+ ClassLoader classLoader =
+ context == null
+ ? Thread.currentThread().getContextClassLoader()
+ : context.getClassLoader();
+ UserDefinedFunction function =
+ UserDefinedFunctionHelper.instantiateFunction(
+ classLoader, new Configuration(), functionName,
catalogFunction);
+ if (function instanceof CatalogAwareFunction) {
+ ((CatalogAwareFunction) function)
+ .setCatalogContext(catalogName, defaultDatabase,
catalogOptions);
+ }
+ return function;
+ }
+ }
+
@Override
public List<String> listDatabases() throws CatalogException {
return catalog.listDatabases();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
index 887bda71f8..0f9b4b33ab 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
@@ -18,13 +18,55 @@
package org.apache.paimon.flink.function;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BlobViewStruct;
+import org.apache.paimon.flink.FlinkFileIOLoader;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
/** Flink scalar function that constructs a serialized {@link BlobViewStruct}.
*/
-public class BlobViewFunction extends ScalarFunction {
+public class BlobViewFunction extends ScalarFunction implements
CatalogAwareFunction {
+
+ @Nullable private String catalogName;
+ @Nullable private Options catalogOptions;
+
+ private transient Catalog catalog;
+ private transient Map<String, Map<String, BlobViewField>>
blobViewFieldCache;
+
+ @Override
+ public void setCatalogContext(
+ String catalogName, String defaultDatabase, Options
catalogOptions) {
+ this.catalogName = catalogName;
+ this.catalogOptions = new Options(catalogOptions.toMap());
+ this.catalog = null;
+ this.blobViewFieldCache = null;
+ }
+
+ @Override
+ public void open(FunctionContext context) {
+ openCatalog(context.getUserCodeClassLoader());
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (catalog != null) {
+ catalog.close();
+ catalog = null;
+ }
+ }
public byte[] eval(String identifier, int fieldId, long rowId) {
if (identifier == null) {
@@ -32,4 +74,112 @@ public class BlobViewFunction extends ScalarFunction {
}
return new BlobViewStruct(Identifier.fromString(identifier), fieldId,
rowId).serialize();
}
+
+ public byte[] eval(String tableName, String fieldName, long rowId) {
+ if (tableName == null || fieldName == null) {
+ return null;
+ }
+
+ BlobViewField field = blobViewField(tableName, fieldName);
+ return new BlobViewStruct(field.tableName, field.fieldId,
rowId).serialize();
+ }
+
+ private BlobViewField blobViewField(String tableName, String fieldName) {
+ Map<String, BlobViewField> tableCache =
blobViewFieldCache().get(tableName);
+ if (tableCache != null) {
+ BlobViewField cached = tableCache.get(fieldName);
+ if (cached != null) {
+ return cached;
+ }
+ }
+
+ Identifier identifier = toIdentifier(tableName);
+ BlobViewField field = new BlobViewField(identifier,
fieldId(identifier, fieldName));
+ if (tableCache == null) {
+ tableCache = new HashMap<>();
+ blobViewFieldCache().put(tableName, tableCache);
+ }
+ tableCache.put(fieldName, field);
+ return field;
+ }
+
+ private int fieldId(Identifier identifier, String fieldName) {
+ try {
+ Table table = catalog().getTable(identifier);
+ if (!table.rowType().containsField(fieldName)) {
+ throw new IllegalArgumentException(
+ "Cannot find blob field "
+ + fieldName
+ + " in upstream table "
+ + identifier.getFullName()
+ + ".");
+ }
+ DataField field = table.rowType().getField(fieldName);
+ if (!field.type().is(DataTypeRoot.BLOB)) {
+ throw new IllegalArgumentException(
+ "Field "
+ + fieldName
+ + " in upstream table "
+ + identifier.getFullName()
+ + " is not a BLOB field.");
+ }
+ return field.id();
+ } catch (Catalog.TableNotExistException e) {
+ throw new IllegalArgumentException(
+ "Cannot find upstream table " + identifier.getFullName() +
".", e);
+ }
+ }
+
+ private Identifier toIdentifier(String tableName) {
+ String[] parts = tableName.split("\\.", 3);
+ if (parts.length == 2) {
+ return new Identifier(parts[0], parts[1]);
+ }
+ if (catalogName != null && catalogName.equals(parts[0])) {
+ return new Identifier(parts[1], parts[2]);
+ }
+ throw new IllegalArgumentException(
+ "Table name must be 'database.table' or '"
+ + catalogName
+ + ".database.table', but is '"
+ + tableName
+ + "'.");
+ }
+
+ private Catalog catalog() {
+ if (catalog == null) {
+ openCatalog(Thread.currentThread().getContextClassLoader());
+ }
+ return catalog;
+ }
+
+ private Map<String, Map<String, BlobViewField>> blobViewFieldCache() {
+ if (blobViewFieldCache == null) {
+ blobViewFieldCache = new HashMap<>();
+ }
+ return blobViewFieldCache;
+ }
+
+ private void openCatalog(ClassLoader classLoader) {
+ if (catalog != null) {
+ return;
+ }
+ if (catalogOptions == null) {
+ throw new IllegalStateException("BlobViewFunction is missing
catalog options.");
+ }
+ catalog =
+ CatalogFactory.createCatalog(
+ CatalogContext.create(catalogOptions, new
FlinkFileIOLoader()),
+ classLoader);
+ }
+
+ private static class BlobViewField {
+ private final Identifier tableName;
+ private final int fieldId;
+
+ private BlobViewField(Identifier tableName, int fieldId) {
+ this.tableName = tableName;
+ this.fieldId = fieldId;
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/CatalogAwareFunction.java
similarity index 61%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/CatalogAwareFunction.java
index 887bda71f8..4a810ec887 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/CatalogAwareFunction.java
@@ -18,18 +18,10 @@
package org.apache.paimon.flink.function;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BlobViewStruct;
+import org.apache.paimon.options.Options;
-import org.apache.flink.table.functions.ScalarFunction;
+/** Interface for Flink functions that need Paimon catalog context. */
+public interface CatalogAwareFunction {
-/** Flink scalar function that constructs a serialized {@link BlobViewStruct}.
*/
-public class BlobViewFunction extends ScalarFunction {
-
- public byte[] eval(String identifier, int fieldId, long rowId) {
- if (identifier == null) {
- return null;
- }
- return new BlobViewStruct(Identifier.fromString(identifier), fieldId,
rowId).serialize();
- }
+ void setCatalogContext(String catalogName, String defaultDatabase, Options
catalogOptions);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index 5ff2e133a4..01e91542c6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -185,12 +185,6 @@ public class BlobTableITCase extends CatalogITCaseBase {
batchSql("INSERT INTO upstream_blob_view VALUES (1, 'row1',
X'48656C6C6F')");
batchSql("INSERT INTO upstream_blob_view VALUES (2, 'row2', X'5945')");
- int pictureFieldId =
-
paimonTable("upstream_blob_view").rowType().getFields().stream()
- .filter(field -> field.name().equals("picture"))
- .findFirst()
- .orElseThrow(() -> new RuntimeException("picture field
not found"))
- .id();
String fullTableName = tEnv.getCurrentDatabase() +
".upstream_blob_view";
tEnv.executeSql(
@@ -203,9 +197,9 @@ public class BlobTableITCase extends CatalogITCaseBase {
batchSql(
String.format(
"INSERT INTO downstream_blob_view"
- + " SELECT id, name, sys.blob_view('%s', %d,
_ROW_ID)"
+ + " SELECT id, name, sys.blob_view('%s',
'picture', _ROW_ID)"
+ " FROM `upstream_blob_view$row_tracking`",
- fullTableName, pictureFieldId));
+ fullTableName));
List<Row> result = batchSql("SELECT * FROM downstream_blob_view ORDER
BY id");
assertThat(result).hasSize(2);
@@ -218,6 +212,44 @@ public class BlobTableITCase extends CatalogITCaseBase {
assertThat((byte[]) result.get(1).getField(2)).isEqualTo(new byte[]
{89, 69});
}
+ @Test
+ public void testBlobViewRejectsUnqualifiedTableName() {
+ assertThatThrownBy(
+ () ->
+ batchSql(
+ "SELECT sys.blob_view("
+ + "'upstream_blob_view', "
+ + "'picture', "
+ + "CAST(0 AS BIGINT))"))
+ .hasRootCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "Table name must be 'database.table' or
'PAIMON.database.table', "
+ + "but is 'upstream_blob_view'.");
+ }
+
+ @Test
+ public void testBlobViewRejectsNonBlobField() {
+ tEnv.executeSql(
+ "CREATE TABLE upstream_non_blob (id INT, picture BYTES)"
+ + " WITH ('row-tracking.enabled'='true',"
+ + " 'data-evolution.enabled'='true')");
+
+ String fullTableName = tEnv.getCurrentDatabase() +
".upstream_non_blob";
+ assertThatThrownBy(
+ () ->
+ batchSql(
+ "SELECT sys.blob_view("
+ + "'%s', "
+ + "'picture', "
+ + "CAST(0 AS BIGINT))",
+ fullTableName))
+ .hasRootCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "Field picture in upstream table "
+ + fullTableName
+ + " is not a BLOB field.");
+ }
+
@Test
public void testBlobInlineFieldRequiresBlobField() {
assertSecondaryBlobFieldRequiresBlobField(