This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 2df6a06b93e6bbcc18019d73155797b2c01883da Author: Zouxxyy <[email protected]> AuthorDate: Sun Sep 28 16:04:47 2025 +0800 [spark] Improve error msg for creating a function on an existing tmp function (#6343) --- .../extensions/RewritePaimonFunctionCommands.scala | 45 +++++++++++++++------ .../spark/sql/PaimonV1FunctionTestBase.scala | 47 +++++++++++++++++++--- 2 files changed, 74 insertions(+), 18 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala index bad7fe3c62..ddbd9df5ac 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.parser.extensions +import org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME import org.apache.paimon.function.{Function => PaimonFunction} import org.apache.paimon.function.FunctionDefinition import org.apache.paimon.spark.SparkCatalog.FUNCTION_DEFINITION_NAME @@ -50,8 +51,17 @@ case class RewritePaimonFunctionCommands(spark: SparkSession) } val applied = plan.resolveOperatorsUp { + case CreateFunction(CatalogAndFunctionIdentifier(_, funcIdent, true), _, _, _, replace) => + if (replace) { + throw new UnsupportedOperationException( + s"$funcIdent is a temporary function, you should use `CREATE OR REPLACE TEMPORARY FUNCTION $funcIdent` or DROP TEMPORARY FUNCTION $funcIdent`.") + } else { + throw new UnsupportedOperationException( + s"$funcIdent is a temporary function and already exists.") + } + case CreateFunction( - CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function, funcIdent), + CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function, funcIdent, false), className, resources, ifExists, @@ -63,7 +73,7 @@ case class RewritePaimonFunctionCommands(spark: SparkSession) CreatePaimonV1FunctionCommand(v1FunctionCatalog, v1Function, ifExists, replace) case DropFunction( - CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function, funcIdent), + CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function, funcIdent, false), ifExists) => if (isPaimonBuildInFunction(funcIdent)) { throw new UnsupportedOperationException(s"Can't drop build-in function: $funcIdent") @@ -72,7 +82,7 @@ case class RewritePaimonFunctionCommands(spark: SparkSession) DropPaimonV1FunctionCommand(v1FunctionCatalog, funcIdent, ifExists) case d @ DescribeFunction( - CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function, funcIdent), + CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function, funcIdent, false), isExtended) // For Paimon built-in functions, Spark will resolve them by itself. if !isPaimonBuildInFunction(funcIdent) => @@ -98,7 +108,7 @@ case class RewritePaimonFunctionCommands(spark: SparkSession) l.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) { case u: UnresolvedFunction => CatalogAndFunctionIdentifier.unapply(u.nameParts) match { - case Some((v1FunctionCatalog: SupportV1Function, funcIdent)) + case Some((v1FunctionCatalog: SupportV1Function, funcIdent, false)) // For Paimon built-in functions, Spark will resolve them by itself. if !isPaimonBuildInFunction(funcIdent) => // If the function is already registered, avoid redundant lookup in the catalog to reduce overhead. @@ -120,7 +130,7 @@ case class RewritePaimonFunctionCommands(spark: SparkSession) private object CatalogAndFunctionIdentifier { - def unapply(unresolved: LogicalPlan): Option[(CatalogPlugin, FunctionIdentifier)] = + def unapply(unresolved: LogicalPlan): Option[(CatalogPlugin, FunctionIdentifier, Boolean)] = unresolved match { case ui: UnresolvedIdentifier => unapply(ui.nameParts) @@ -130,11 +140,13 @@ case class RewritePaimonFunctionCommands(spark: SparkSession) None } - def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, FunctionIdentifier)] = { + def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, FunctionIdentifier, Boolean)] = { nameParts match { // Spark's built-in or tmp functions is without database name or catalog name. - case Seq(funName) if isSparkBuiltInOrTmpFunction(FunctionIdentifier(funName)) => + case Seq(funName) if isSparkBuiltInFunction(FunctionIdentifier(funName)) => None + case Seq(funName) if isSparkTmpFunc(FunctionIdentifier(funName)) => + Some(null, FunctionIdentifier(funName), true) case CatalogAndIdentifier(v1FunctionCatalog: SupportV1Function, ident) if v1FunctionCatalog.v1FunctionEnabled() => Some( @@ -142,7 +154,8 @@ case class RewritePaimonFunctionCommands(spark: SparkSession) FunctionIdentifier( ident.name(), Some(ident.namespace().last), - Some(v1FunctionCatalog.name))) + Some(v1FunctionCatalog.name)), + false) case _ => None } @@ -150,12 +163,20 @@ case class RewritePaimonFunctionCommands(spark: SparkSession) } private def isPaimonBuildInFunction(funcIdent: FunctionIdentifier): Boolean = { - PaimonFunctions.names.contains(funcIdent.funcName) + funcIdent.database match { + case Some(db) + if db == SYSTEM_DATABASE_NAME && PaimonFunctions.names.contains(funcIdent.funcName) => + true + case _ => false + } + } + + private def isSparkBuiltInFunction(funcIdent: FunctionIdentifier): Boolean = { + catalogManager.v1SessionCatalog.isBuiltinFunction(funcIdent) } - private def isSparkBuiltInOrTmpFunction(funcIdent: FunctionIdentifier): Boolean = { - catalogManager.v1SessionCatalog.isBuiltinFunction(funcIdent) || catalogManager.v1SessionCatalog - .isTemporaryFunction(funcIdent) + private def isSparkTmpFunc(funcIdent: FunctionIdentifier): Boolean = { + catalogManager.v1SessionCatalog.isTemporaryFunction(funcIdent) } private def isPaimonV1Function(fun: PaimonFunction): Boolean = { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala index eec9a1acb7..09aa98d93a 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala @@ -145,17 +145,17 @@ abstract class PaimonV1FunctionTestBase extends PaimonSparkTestWithRestCatalogBa test("Paimon V1 Function: unsupported operation") { // create a build-in function - intercept[Exception] { + assert(intercept[Exception] { sql(s""" - |CREATE FUNCTION max_pt AS '$UDFExampleAdd2Class' + |CREATE FUNCTION sys.max_pt AS '$UDFExampleAdd2Class' |USING JAR '$testUDFJarPath' |""".stripMargin) - } + }.getMessage.contains("Can't create build-in function")) // drop a build-in function - intercept[Exception] { - sql("DROP FUNCTION max_pt") - } + assert(intercept[Exception] { + sql("DROP FUNCTION sys.max_pt") + }.getMessage.contains("Can't drop build-in function")) } test("Paimon V1 Function: user defined aggregate function") { @@ -231,6 +231,41 @@ abstract class PaimonV1FunctionTestBase extends PaimonSparkTestWithRestCatalogBa } } } + + test("Paimon V1 Function: create or drop function on an existing temporary function") { + withUserDefinedFunction("udf_add2" -> true) { + sql(s""" + |CREATE TEMPORARY FUNCTION udf_add2 AS '$UDFExampleAdd2Class' + |USING JAR '$testUDFJarPath' + |""".stripMargin) + + assert(intercept[Exception] { + sql(s""" + |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class' + |USING JAR '$testUDFJarPath' + |""".stripMargin) + }.getMessage.contains("udf_add2 is a temporary function and already exists")) + + assert(intercept[Exception] { + sql(s""" + |CREATE OR REPLACE FUNCTION udf_add2 AS '$UDFExampleAdd2Class' + |USING JAR '$testUDFJarPath' + |""".stripMargin) + }.getMessage.contains( + "udf_add2 is a temporary function, you should use `CREATE OR REPLACE TEMPORARY FUNCTION udf_add2`")) + + sql(s""" + |CREATE OR REPLACE TEMPORARY FUNCTION udf_add2 AS '$UDFExampleAdd2Class' + |USING JAR '$testUDFJarPath' + |""".stripMargin) + + assert(intercept[Exception] { + sql(s""" + |DROP FUNCTION udf_add2 + |""".stripMargin) + }.getMessage.contains("udf_add2 is a built-in/temporary function")) + } + } } class DisablePaimonV1FunctionTest extends PaimonSparkTestWithRestCatalogBase {
