This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 94ebeb8e90 [spark] Resolve blob_view field name to field id at
analysis time (#7836)
94ebeb8e90 is described below
commit 94ebeb8e90181647544e80fe6b7e008133161f50
Author: YeJunHao <[email protected]>
AuthorDate: Thu May 14 22:15:10 2026 +0800
[spark] Resolve blob_view field name to field id at analysis time (#7836)
---
...tion.java => BlobViewFieldIdSparkFunction.java} | 17 +++-
.../spark/function/BlobViewSparkFunction.java | 20 +++--
.../paimon/spark/function/BlobViewUnbound.java | 9 +-
.../catalyst/analysis/PaimonFunctionResolver.scala | 46 ++++++++++
.../catalyst/analysis/ReplacePaimonFunctions.scala | 97 ++++++++++++++++++++--
.../spark/commands/WriteIntoPaimonTable.scala | 9 +-
.../extensions/PaimonSparkSessionExtensions.scala | 1 +
.../org/apache/paimon/spark/sql/BlobTestBase.scala | 37 +++++++++
8 files changed, 211 insertions(+), 25 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewSparkFunction.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewFieldIdSparkFunction.java
similarity index 75%
copy from
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewSparkFunction.java
copy to
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewFieldIdSparkFunction.java
index 9e941a3ad5..55ace9135b 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewSparkFunction.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewFieldIdSparkFunction.java
@@ -21,6 +21,7 @@ package org.apache.paimon.spark.function;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BlobViewStruct;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
@@ -28,8 +29,8 @@ import org.apache.spark.unsafe.types.UTF8String;
import java.io.Serializable;
-/** Spark scalar function that constructs a serialized {@link BlobViewStruct}.
*/
-public class BlobViewSparkFunction implements ScalarFunction<byte[]>,
Serializable {
+/** Spark scalar function that constructs a serialized {@link BlobViewStruct}
by field id. */
+public class BlobViewFieldIdSparkFunction implements ScalarFunction<byte[]>,
Serializable {
@Override
public DataType[] inputTypes() {
@@ -45,6 +46,18 @@ public class BlobViewSparkFunction implements
ScalarFunction<byte[]>, Serializab
if (identifier == null) {
return null;
}
+ return blobViewStruct(identifier, fieldId, rowId);
+ }
+
+ @Override
+ public byte[] produceResult(InternalRow input) {
+ if (input.isNullAt(0) || input.isNullAt(1) || input.isNullAt(2)) {
+ return null;
+ }
+ return blobViewStruct(input.getUTF8String(0), input.getInt(1),
input.getLong(2));
+ }
+
+ private byte[] blobViewStruct(UTF8String identifier, int fieldId, long
rowId) {
return new
BlobViewStruct(Identifier.fromString(identifier.toString()), fieldId, rowId)
.serialize();
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewSparkFunction.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewSparkFunction.java
index 9e941a3ad5..2bcf9a317d 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewSparkFunction.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewSparkFunction.java
@@ -18,9 +18,6 @@
package org.apache.paimon.spark.function;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BlobViewStruct;
-
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
@@ -28,12 +25,12 @@ import org.apache.spark.unsafe.types.UTF8String;
import java.io.Serializable;
-/** Spark scalar function that constructs a serialized {@link BlobViewStruct}.
*/
+/** Spark scalar function resolved by Spark analysis before execution. */
public class BlobViewSparkFunction implements ScalarFunction<byte[]>,
Serializable {
@Override
public DataType[] inputTypes() {
- return new DataType[] {DataTypes.StringType, DataTypes.IntegerType,
DataTypes.LongType};
+ return new DataType[] {DataTypes.StringType, DataTypes.StringType,
DataTypes.LongType};
}
@Override
@@ -41,16 +38,21 @@ public class BlobViewSparkFunction implements
ScalarFunction<byte[]>, Serializab
return DataTypes.BinaryType;
}
- public byte[] invoke(UTF8String identifier, int fieldId, long rowId) {
- if (identifier == null) {
+ public byte[] invoke(UTF8String tableName, UTF8String fieldName, long
rowId) {
+ if (tableName == null || fieldName == null) {
return null;
}
- return new
BlobViewStruct(Identifier.fromString(identifier.toString()), fieldId, rowId)
- .serialize();
+ throw new UnsupportedOperationException(
+ "Function 'blob_view' requires literal tableName and
fieldName.");
}
@Override
public String name() {
return "blob_view";
}
+
+ @Override
+ public String canonicalName() {
+ return "paimon.blob_view(string, string, bigint)";
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewUnbound.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewUnbound.java
index d09f893a43..8477459efe 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewUnbound.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewUnbound.java
@@ -20,7 +20,6 @@ package org.apache.paimon.spark.function;
import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
-import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructType;
@@ -33,16 +32,16 @@ public class BlobViewUnbound implements UnboundFunction {
if (inputType.fields().length != 3) {
throw new UnsupportedOperationException(
"Function 'blob_view' requires 3 arguments "
- + "(identifier STRING, fieldId INT, rowId BIGINT),
but found "
+ + "(tableName STRING, fieldName STRING, rowId
BIGINT), but found "
+ inputType.fields().length);
}
if (!(inputType.fields()[0].dataType() instanceof StringType)) {
throw new UnsupportedOperationException(
"The first argument of 'blob_view' must be STRING type.");
}
- if (!(inputType.fields()[1].dataType() instanceof IntegerType)) {
+ if (!(inputType.fields()[1].dataType() instanceof StringType)) {
throw new UnsupportedOperationException(
- "The second argument of 'blob_view' must be INT type.");
+ "The second argument of 'blob_view' must be STRING type.");
}
if (!(inputType.fields()[2].dataType() instanceof LongType)) {
throw new UnsupportedOperationException(
@@ -53,7 +52,7 @@ public class BlobViewUnbound implements UnboundFunction {
@Override
public String description() {
- return "Construct a serialized BlobViewStruct from identifier, fieldId
and rowId";
+ return "Construct a serialized BlobViewStruct from tableName,
fieldName and rowId";
}
@Override
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
index 3fb76fa521..7a87cf7e31 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
@@ -18,13 +18,19 @@
package org.apache.paimon.spark.catalyst.analysis
+import org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME
import org.apache.paimon.spark.catalog.SupportV1Function
+import org.apache.paimon.spark.catalog.functions.PaimonFunctions
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import
org.apache.spark.sql.catalyst.parser.extensions.UnResolvedPaimonV1Function
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_FUNCTION
+import org.apache.spark.sql.types.{LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
case class PaimonFunctionResolver(spark: SparkSession) extends
Rule[LogicalPlan] {
@@ -34,6 +40,9 @@ case class PaimonFunctionResolver(spark: SparkSession)
extends Rule[LogicalPlan]
plan.resolveOperatorsUpWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
case l: LogicalPlan =>
l.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
+ case u: UnresolvedFunction
+ if isBlobViewFunction(u.nameParts) &&
u.arguments.forall(_.resolved) =>
+ resolveBlobView(u)
case u: UnResolvedPaimonV1Function if u.arguments.forall(_.resolved)
=>
u.funcIdent.catalog match {
case Some(catalog) =>
@@ -48,4 +57,41 @@ case class PaimonFunctionResolver(spark: SparkSession)
extends Rule[LogicalPlan]
}
}
}
+
+ private def resolveBlobView(u: UnresolvedFunction): Expression = {
+ if (u.arguments.length != 3) {
+ throw new UnsupportedOperationException(
+ s"Function 'blob_view' requires 3 arguments " +
+ s"(tableName STRING, fieldName STRING, rowId BIGINT), but found
${u.arguments.length}")
+ }
+ val tableName = literalString(u.arguments(0), "tableName")
+ val fieldName = literalString(u.arguments(1), "fieldName")
+ if (u.arguments(2).dataType != LongType) {
+ throw new UnsupportedOperationException(
+ "The third argument of 'blob_view' must be BIGINT type.")
+ }
+ ReplacePaimonFunctions.resolveBlobView(spark, tableName, fieldName,
u.arguments(2))
+ }
+
+ private def literalString(expr: Expression, argumentName: String): String = {
+ if (!expr.isInstanceOf[Literal]) {
+ throw new UnsupportedOperationException(s"$argumentName must be a
literal")
+ }
+ if (expr.dataType != StringType) {
+ throw new UnsupportedOperationException(s"$argumentName must be STRING
type")
+ }
+
+ val value = expr.eval()
+ if (value == null) {
+ null
+ } else {
+ value.asInstanceOf[UTF8String].toString
+ }
+ }
+
+ private def isBlobViewFunction(nameParts: Seq[String]): Boolean = {
+ nameParts.length >= 2 &&
+ nameParts.last.equalsIgnoreCase(PaimonFunctions.BLOB_VIEW) &&
+ nameParts(nameParts.length - 2).equalsIgnoreCase(SYSTEM_DATABASE_NAME)
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/ReplacePaimonFunctions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/ReplacePaimonFunctions.scala
index 3437966953..ff34fcb180 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/ReplacePaimonFunctions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/ReplacePaimonFunctions.scala
@@ -20,18 +20,66 @@ package org.apache.paimon.spark.catalyst.analysis
import org.apache.paimon.spark.{DataConverter, SparkTable, SparkTypeUtils,
SparkUtils}
import org.apache.paimon.spark.catalog.SparkBaseCatalog
+import org.apache.paimon.spark.catalog.functions.PaimonFunctions
+import org.apache.paimon.spark.function.{BlobViewFieldIdSparkFunction,
BlobViewSparkFunction}
+import org.apache.paimon.spark.utils.CatalogUtils
+import org.apache.paimon.types.DataTypeRoot
import org.apache.paimon.utils.{InternalRowUtils, TypeUtils}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{ApplyFunctionExpression,
Cast, Expression, Literal}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.expressions.objects.Invoke
+import org.apache.spark.sql.catalyst.plans.logical.{AnalysisHelper,
LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._
-import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.types.{BinaryType, StringType}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
+object ReplacePaimonFunctions {
+
+ def resolveBlobView(
+ spark: SparkSession,
+ tableName: String,
+ fieldName: String,
+ rowId: Expression): Expression = {
+ if (tableName == null || fieldName == null) {
+ Literal(null, BinaryType)
+ } else {
+ val catalogAndIdentifier = SparkUtils
+ .catalogAndIdentifier(spark, tableName,
spark.sessionState.catalogManager.currentCatalog)
+ if (!catalogAndIdentifier.catalog().isInstanceOf[SparkBaseCatalog]) {
+ throw new UnsupportedOperationException(
+ s"${catalogAndIdentifier.catalog()} is not a Paimon catalog")
+ }
+
+ val table =
+
catalogAndIdentifier.catalog.asTableCatalog.loadTable(catalogAndIdentifier.identifier())
+ assert(table.isInstanceOf[SparkTable])
+ val sparkTable = table.asInstanceOf[SparkTable]
+ val paimonIdentifier =
+ CatalogUtils.toIdentifier(
+ catalogAndIdentifier.identifier(),
+ catalogAndIdentifier.catalog().name())
+ if (!sparkTable.table.rowType().containsField(fieldName)) {
+ throw new IllegalArgumentException(
+ s"Cannot find blob field $fieldName in upstream table
${paimonIdentifier.getFullName}.")
+ }
+ val field = sparkTable.table.rowType().getField(fieldName)
+ if (!field.`type`().is(DataTypeRoot.BLOB)) {
+ throw new IllegalArgumentException(
+ s"Field $fieldName in upstream table ${paimonIdentifier.getFullName}
" +
+ "is not a BLOB field.")
+ }
+
+ ApplyFunctionExpression(
+ new BlobViewFieldIdSparkFunction,
+ Seq(Literal(paimonIdentifier.getFullName), Literal(field.id()), rowId))
+ }
+ }
+}
+
/** A rule to replace Paimon functions with literal values. */
case class ReplacePaimonFunctions(spark: SparkSession) extends
Rule[LogicalPlan] {
private def replaceMaxPt(func: ApplyFunctionExpression): Expression = {
@@ -83,12 +131,47 @@ case class ReplacePaimonFunctions(spark: SparkSession)
extends Rule[LogicalPlan]
Cast(literal, func.dataType)
}
+ private def replaceBlobView(arguments: Seq[Expression]): Expression = {
+ assert(arguments.size == 3)
+ val tableName = literalString(arguments(0), "tableName")
+ val fieldName = literalString(arguments(1), "fieldName")
+ ReplacePaimonFunctions.resolveBlobView(spark, tableName, fieldName,
arguments(2))
+ }
+
+ private def literalString(child: Expression, argumentName: String): String =
{
+ if (!child.isInstanceOf[Literal]) {
+ throw new UnsupportedOperationException(s"$argumentName must be a
literal")
+ }
+ val value = child.eval()
+ if (value == null) {
+ null
+ } else {
+ value.asInstanceOf[UTF8String].toString
+ }
+ }
+
+ private def isBlobViewInvoke(invoke: Invoke): Boolean = {
+ if (invoke.functionName != "invoke" || !invoke.targetObject.foldable) {
+ false
+ } else {
+ invoke.targetObject.eval().isInstanceOf[BlobViewSparkFunction]
+ }
+ }
+
override def apply(plan: LogicalPlan): LogicalPlan = {
- plan.resolveExpressions {
- case func: ApplyFunctionExpression
- if func.function.name() == "max_pt" &&
- func.function.canonicalName().startsWith("paimon") =>
- replaceMaxPt(func)
+ AnalysisHelper.allowInvokingTransformsInAnalyzer {
+ plan.transformAllExpressions {
+ case func: ApplyFunctionExpression
+ if func.function.name() == "max_pt" &&
+ func.function.canonicalName().startsWith("paimon") =>
+ replaceMaxPt(func)
+ case func: ApplyFunctionExpression
+ if func.function.name() == PaimonFunctions.BLOB_VIEW &&
+ func.function.canonicalName().startsWith("paimon") =>
+ replaceBlobView(func.children)
+ case invoke: Invoke if isBlobViewInvoke(invoke) =>
+ replaceBlobView(invoke.arguments)
+ }
}
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 4025725f02..623c88d72f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -21,11 +21,12 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE
import org.apache.paimon.options.Options
import org.apache.paimon.spark._
+import org.apache.paimon.spark.catalyst.analysis.ReplacePaimonFunctions
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.table.FileStoreTable
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, PaimonUtils, Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand
@@ -44,7 +45,11 @@ case class WriteIntoPaimonTable(
with Logging {
override def run(sparkSession: SparkSession): Seq[Row] = {
- val data = mergeSchema(sparkSession, _data, options)
+ val replacedData =
+ PaimonUtils.createDataset(
+ sparkSession,
+ ReplacePaimonFunctions(sparkSession)(_data.queryExecution.analyzed))
+ val data = mergeSchema(sparkSession, replacedData, options)
val (dynamicPartitionOverwriteMode, overwritePartition) = parseSaveMode()
// use the extra options to rebuild the table object
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index 82dfeed0d2..e433a5f7d4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -99,6 +99,7 @@ class PaimonSparkSessionExtensions extends
(SparkSessionExtensions => Unit) {
}
// optimization rules
+ extensions.injectOptimizerRule(spark => ReplacePaimonFunctions(spark))
extensions.injectOptimizerRule(_ =>
OptimizeMetadataOnlyDeleteFromPaimonTable)
extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueries)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index 38d9793d2d..46158ed4dc 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -200,6 +200,43 @@ class BlobTestBase extends PaimonSparkTestBase {
}
}
+ test("Blob: test write blob view with built-in function") {
+ withTable("upstream_blob_view", "downstream_blob_view") {
+ sql(
+ "CREATE TABLE upstream_blob_view (id INT, name STRING, picture BINARY)
" +
+ "TBLPROPERTIES (" +
+ "'row-tracking.enabled'='true', " +
+ "'data-evolution.enabled'='true', " +
+ "'blob-field'='picture')")
+ sql(
+ "INSERT INTO upstream_blob_view VALUES " +
+ "(1, 'row1', X'48656C6C6F'), " +
+ "(2, 'row2', X'5945')")
+
+ val upstreamFullName = s"$dbName0.upstream_blob_view"
+
+ sql(
+ "CREATE TABLE downstream_blob_view (id INT, label STRING, image_ref
BINARY) " +
+ "TBLPROPERTIES (" +
+ "'row-tracking.enabled'='true', " +
+ "'data-evolution.enabled'='true', " +
+ "'blob-field'='image_ref', " +
+ "'blob-view-field'='image_ref')")
+
+ sql(
+ s"INSERT INTO downstream_blob_view " +
+ s"SELECT id, name, sys.blob_view('$upstreamFullName', 'picture',
_ROW_ID) " +
+ s"FROM `upstream_blob_view$$row_tracking`")
+
+ checkAnswer(
+ sql("SELECT * FROM downstream_blob_view ORDER BY id"),
+ Seq(
+ Row(1, "row1", Array[Byte](72, 101, 108, 108, 111)),
+ Row(2, "row2", Array[Byte](89, 69)))
+ )
+ }
+ }
+
test("Blob: test write blob descriptor from external storage") {
val catalogName = "isolated_paimon"
val databaseName = "external_blob_db"