This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit b7f49f84aa0859784d3ffc190ae578f22ee1413a Author: Jingsong Lee <[email protected]> AuthorDate: Wed Sep 24 00:37:04 2025 +0800 [spark] System function max_pt should be used as 'sys.max_pt' (#6312) --- docs/content/spark/sql-functions.md | 9 ++--- .../java/org/apache/paimon/spark/SparkCatalog.java | 42 +++++++++++---------- .../paimon/spark/catalog/SparkBaseCatalog.java | 10 +++-- .../paimon/spark/SparkCatalogWithRestTest.java | 3 ++ .../catalog/functions/BucketFunctionTest.java | 4 +- .../spark/benchmark/BucketFunctionBenchmark.scala | 44 ---------------------- .../paimon/spark/sql/PaimonFunctionTest.scala | 6 +-- .../spark/sql/PaimonV1FunctionTestBase.scala | 2 +- 8 files changed, 41 insertions(+), 79 deletions(-) diff --git a/docs/content/spark/sql-functions.md b/docs/content/spark/sql-functions.md index 65019ac5fd..99f4dcb153 100644 --- a/docs/content/spark/sql-functions.md +++ b/docs/content/spark/sql-functions.md @@ -28,12 +28,11 @@ under the License. This section introduce all available Paimon Spark functions. - ## Built-in Function ### max_pt -`max_pt($table_name)` +`sys.max_pt($table_name)` It accepts a string type literal to specify the table name and return a max-valid-toplevel partition value. - **valid**: the partition which contains data files @@ -47,15 +46,13 @@ It would throw exception when: **Example** ```sql -SELECT max_pt('t'); +SELECT sys.max_pt('t'); -- 20250101 -SELECT * FROM t where pt = max_pt('t'); +SELECT * FROM t where pt = sys.max_pt('t'); -- a, 20250101 ``` -**Since: 1.1.0** - ## User-defined Function Paimon Spark supports two types of user-defined functions: lambda functions and file-based functions. diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 1df1d66f80..db7281de92 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -524,34 +524,33 @@ public class SparkCatalog extends SparkBaseCatalog @Override public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException { - if (isFunctionNamespace(namespace)) { - List<Identifier> functionIdentifiers = new ArrayList<>(); - PaimonFunctions.names() - .forEach(name -> functionIdentifiers.add(Identifier.of(namespace, name))); - if (namespace.length > 0) { - String databaseName = getDatabaseNameFromNamespace(namespace); - try { - catalog.listFunctions(databaseName) - .forEach( - name -> - functionIdentifiers.add( - Identifier.of(namespace, name))); - } catch (Catalog.DatabaseNotExistException e) { - throw new NoSuchNamespaceException(namespace); - } + if (isSystemFunctionNamespace(namespace)) { + List<Identifier> result = new ArrayList<>(); + PaimonFunctions.names().forEach(name -> result.add(Identifier.of(namespace, name))); + return result.toArray(new Identifier[0]); + } else if (isDatabaseFunctionNamespace(namespace)) { + List<Identifier> result = new ArrayList<>(); + String databaseName = getDatabaseNameFromNamespace(namespace); + try { + catalog.listFunctions(databaseName) + .forEach(name -> result.add(Identifier.of(namespace, name))); + } catch (Catalog.DatabaseNotExistException e) { + throw new NoSuchNamespaceException(namespace); } - return functionIdentifiers.toArray(new Identifier[0]); + return result.toArray(new Identifier[0]); } throw new NoSuchNamespaceException(namespace); } @Override public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { - if (isFunctionNamespace(ident.namespace())) { + String[] namespace = ident.namespace(); + if (isSystemFunctionNamespace(namespace)) { UnboundFunction func = PaimonFunctions.load(ident.name()); if (func != null) { return func; } + } else if (isDatabaseFunctionNamespace(namespace)) { try { Function paimonFunction = catalog.getFunction(toIdentifier(ident)); FunctionDefinition functionDefinition = @@ -582,11 +581,14 @@ public class SparkCatalog extends SparkBaseCatalog throw new NoSuchFunctionException(ident); } - private boolean isFunctionNamespace(String[] namespace) { + private boolean isSystemFunctionNamespace(String[] namespace) { // Allow for empty namespace, as Spark's bucket join will use `bucket` function with empty // namespace to generate transforms for partitioning. - // Otherwise, check if it is paimon namespace. - return namespace.length == 0 || (namespace.length == 1 && namespaceExists(namespace)); + return namespace.length == 0 || isSystemNamespace(namespace); + } + + private boolean isDatabaseFunctionNamespace(String[] namespace) { + return namespace.length == 1 && namespaceExists(namespace); } private PaimonV1FunctionRegistry v1FunctionRegistry() { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java index 1cb3035fad..ac6736e2e1 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java @@ -18,7 +18,6 @@ package org.apache.paimon.spark.catalog; -import org.apache.paimon.catalog.Catalog; import org.apache.paimon.spark.SparkProcedures; import org.apache.paimon.spark.SparkSource; import org.apache.paimon.spark.analysis.NoSuchProcedureException; @@ -35,6 +34,7 @@ import javax.annotation.Nullable; import java.util.Collections; import java.util.Set; +import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; import static org.apache.spark.sql.connector.catalog.TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE; /** Spark base catalog. */ @@ -54,7 +54,7 @@ public abstract class SparkBaseCatalog @Override public Procedure loadProcedure(Identifier identifier) throws NoSuchProcedureException { - if (Catalog.SYSTEM_DATABASE_NAME.equals(identifier.namespace()[0])) { + if (isSystemNamespace(identifier.namespace())) { ProcedureBuilder builder = SparkProcedures.newBuilder(identifier.name()); if (builder != null) { return builder.withTableCatalog(this).build(); @@ -63,7 +63,11 @@ public abstract class SparkBaseCatalog throw new NoSuchProcedureException(identifier); } - public boolean usePaimon(@Nullable String provider) { + public static boolean usePaimon(@Nullable String provider) { return provider == null || SparkSource.NAME().equalsIgnoreCase(provider); } + + public static boolean isSystemNamespace(String[] namespace) { + return namespace.length == 1 && namespace[0].equalsIgnoreCase(SYSTEM_DATABASE_NAME); + } } diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java index 575eb72711..ee8978c687 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java @@ -165,6 +165,9 @@ public class SparkCatalogWithRestTest { .get(0) .toString()) .isEqualTo("[3]"); + assertThat(spark.sql("show user functions").collectAsList().toString()) + .contains("[paimon.db2.area_func]"); + paimonCatalog.dropFunction(identifier, false); cleanFunction(functionName); } diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java index b8fbcdae42..214965bf15 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java @@ -237,7 +237,7 @@ public class BucketFunctionTest { setupTable(bucketColumns); spark.sql( String.format( - "SELECT id_col, __paimon_bucket as expected_bucket, paimon.bucket(%s, %s) FROM %s", + "SELECT id_col, __paimon_bucket as expected_bucket, paimon.sys.bucket(%s, %s) FROM %s", NUM_BUCKETS, String.join(",", bucketColumns), TABLE_NAME)) .collectAsList() .forEach(row -> Assertions.assertThat(row.getInt(2)).isEqualTo(row.get(1))); @@ -328,7 +328,7 @@ public class BucketFunctionTest { setupTable(TIMESTAMP_COL_PRECISION_3); spark.sql( String.format( - "SELECT id_col, __paimon_bucket as expected_bucket, paimon.bucket(%s, %s) FROM %s", + "SELECT id_col, __paimon_bucket as expected_bucket, paimon.sys.bucket(%s, %s) FROM %s", NUM_BUCKETS, String.join(",", TIMESTAMP_COL_PRECISION_3), TABLE_NAME)) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/BucketFunctionBenchmark.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/BucketFunctionBenchmark.scala deleted file mode 100644 index 1ba618c6a1..0000000000 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/BucketFunctionBenchmark.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.spark.benchmark - -import org.apache.spark.sql.paimon.PaimonBenchmark - -object BucketFunctionBenchmark extends PaimonSqlBasedBenchmark { - - private val N = 20L * 1000 * 1000 - - override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { - val benchmark = PaimonBenchmark(s"Bucket function", N, output = output) - - benchmark.addCase("Single int column", 3) { - _ => spark.range(N).selectExpr("fixed_bucket(10, id)").noop() - } - - benchmark.addCase("Single string column", 3) { - _ => spark.range(N).selectExpr("fixed_bucket(10, uuid())").noop() - } - - benchmark.addCase("Multiple columns", 3) { - _ => spark.range(N).selectExpr("fixed_bucket(10, id, uuid(), uuid())").noop() - } - - benchmark.run() - } -} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala index b1263cd8d2..765eb136bb 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala @@ -69,7 +69,7 @@ class PaimonFunctionTest extends PaimonHiveTestBase { Seq("paimon", paimonHiveCatalogName).foreach { catalogName => sql(s"use $catalogName") - val functions = sql("show user functions").collect() + val functions = sql("show user functions in sys").collect() assert(functions.exists(_.getString(0).contains("max_pt")), catalogName) } } @@ -120,9 +120,9 @@ class PaimonFunctionTest extends PaimonHiveTestBase { { sql(s"use $catalogName") val maxPt = if (catalogName == sparkCatalogName) { - "paimon.max_pt" + "paimon.sys.max_pt" } else { - "max_pt" + "sys.max_pt" } intercept[Exception] { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala index 743b36f8dd..eec9a1acb7 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala @@ -106,7 +106,7 @@ abstract class PaimonV1FunctionTestBase extends PaimonSparkTestWithRestCatalogBa sql("INSERT INTO t VALUES (1, 2), (3, 4)") checkAnswer( sql( - "SELECT a, udf_add2(pow(a, pt), max_pt('t')), pow(a, udf_add2(a, pt)) FROM t ORDER BY a"), + "SELECT a, udf_add2(pow(a, pt), sys.max_pt('t')), pow(a, udf_add2(a, pt)) FROM t ORDER BY a"), Seq(Row(1, 5.0d, 1.0d), Row(3, 85.0d, 2187.0d)) ) }
