This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit b7f49f84aa0859784d3ffc190ae578f22ee1413a
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 24 00:37:04 2025 +0800

    [spark] System function max_pt should be used as 'sys.max_pt' (#6312)
---
 docs/content/spark/sql-functions.md                |  9 ++---
 .../java/org/apache/paimon/spark/SparkCatalog.java | 42 +++++++++++----------
 .../paimon/spark/catalog/SparkBaseCatalog.java     | 10 +++--
 .../paimon/spark/SparkCatalogWithRestTest.java     |  3 ++
 .../catalog/functions/BucketFunctionTest.java      |  4 +-
 .../spark/benchmark/BucketFunctionBenchmark.scala  | 44 ----------------------
 .../paimon/spark/sql/PaimonFunctionTest.scala      |  6 +--
 .../spark/sql/PaimonV1FunctionTestBase.scala       |  2 +-
 8 files changed, 41 insertions(+), 79 deletions(-)

diff --git a/docs/content/spark/sql-functions.md 
b/docs/content/spark/sql-functions.md
index 65019ac5fd..99f4dcb153 100644
--- a/docs/content/spark/sql-functions.md
+++ b/docs/content/spark/sql-functions.md
@@ -28,12 +28,11 @@ under the License.
 
 This section introduce all available Paimon Spark functions.
 
-
 ## Built-in Function
 
 ### max_pt
 
-`max_pt($table_name)`
+`sys.max_pt($table_name)`
 
 It accepts a string type literal to specify the table name and return a 
max-valid-toplevel partition value.
 - **valid**: the partition which contains data files
@@ -47,15 +46,13 @@ It would throw exception when:
 **Example**
 
 ```sql
-SELECT max_pt('t');
+SELECT sys.max_pt('t');
 -- 20250101
  
-SELECT * FROM t where pt = max_pt('t');
+SELECT * FROM t where pt = sys.max_pt('t');
 -- a, 20250101
 ```
 
-**Since: 1.1.0**
-
 ## User-defined Function
 
 Paimon Spark supports two types of user-defined functions: lambda functions 
and file-based functions.
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 1df1d66f80..db7281de92 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -524,34 +524,33 @@ public class SparkCatalog extends SparkBaseCatalog
 
     @Override
     public Identifier[] listFunctions(String[] namespace) throws 
NoSuchNamespaceException {
-        if (isFunctionNamespace(namespace)) {
-            List<Identifier> functionIdentifiers = new ArrayList<>();
-            PaimonFunctions.names()
-                    .forEach(name -> 
functionIdentifiers.add(Identifier.of(namespace, name)));
-            if (namespace.length > 0) {
-                String databaseName = getDatabaseNameFromNamespace(namespace);
-                try {
-                    catalog.listFunctions(databaseName)
-                            .forEach(
-                                    name ->
-                                            functionIdentifiers.add(
-                                                    Identifier.of(namespace, 
name)));
-                } catch (Catalog.DatabaseNotExistException e) {
-                    throw new NoSuchNamespaceException(namespace);
-                }
+        if (isSystemFunctionNamespace(namespace)) {
+            List<Identifier> result = new ArrayList<>();
+            PaimonFunctions.names().forEach(name -> 
result.add(Identifier.of(namespace, name)));
+            return result.toArray(new Identifier[0]);
+        } else if (isDatabaseFunctionNamespace(namespace)) {
+            List<Identifier> result = new ArrayList<>();
+            String databaseName = getDatabaseNameFromNamespace(namespace);
+            try {
+                catalog.listFunctions(databaseName)
+                        .forEach(name -> result.add(Identifier.of(namespace, 
name)));
+            } catch (Catalog.DatabaseNotExistException e) {
+                throw new NoSuchNamespaceException(namespace);
             }
-            return functionIdentifiers.toArray(new Identifier[0]);
+            return result.toArray(new Identifier[0]);
         }
         throw new NoSuchNamespaceException(namespace);
     }
 
     @Override
     public UnboundFunction loadFunction(Identifier ident) throws 
NoSuchFunctionException {
-        if (isFunctionNamespace(ident.namespace())) {
+        String[] namespace = ident.namespace();
+        if (isSystemFunctionNamespace(namespace)) {
             UnboundFunction func = PaimonFunctions.load(ident.name());
             if (func != null) {
                 return func;
             }
+        } else if (isDatabaseFunctionNamespace(namespace)) {
             try {
                 Function paimonFunction = 
catalog.getFunction(toIdentifier(ident));
                 FunctionDefinition functionDefinition =
@@ -582,11 +581,14 @@ public class SparkCatalog extends SparkBaseCatalog
         throw new NoSuchFunctionException(ident);
     }
 
-    private boolean isFunctionNamespace(String[] namespace) {
+    private boolean isSystemFunctionNamespace(String[] namespace) {
         // Allow for empty namespace, as Spark's bucket join will use `bucket` 
function with empty
         // namespace to generate transforms for partitioning.
-        // Otherwise, check if it is paimon namespace.
-        return namespace.length == 0 || (namespace.length == 1 && 
namespaceExists(namespace));
+        return namespace.length == 0 || isSystemNamespace(namespace);
+    }
+
+    private boolean isDatabaseFunctionNamespace(String[] namespace) {
+        return namespace.length == 1 && namespaceExists(namespace);
     }
 
     private PaimonV1FunctionRegistry v1FunctionRegistry() {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
index 1cb3035fad..ac6736e2e1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.spark.catalog;
 
-import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.spark.SparkProcedures;
 import org.apache.paimon.spark.SparkSource;
 import org.apache.paimon.spark.analysis.NoSuchProcedureException;
@@ -35,6 +34,7 @@ import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.Set;
 
+import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
 import static 
org.apache.spark.sql.connector.catalog.TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE;
 
 /** Spark base catalog. */
@@ -54,7 +54,7 @@ public abstract class SparkBaseCatalog
 
     @Override
     public Procedure loadProcedure(Identifier identifier) throws 
NoSuchProcedureException {
-        if (Catalog.SYSTEM_DATABASE_NAME.equals(identifier.namespace()[0])) {
+        if (isSystemNamespace(identifier.namespace())) {
             ProcedureBuilder builder = 
SparkProcedures.newBuilder(identifier.name());
             if (builder != null) {
                 return builder.withTableCatalog(this).build();
@@ -63,7 +63,11 @@ public abstract class SparkBaseCatalog
         throw new NoSuchProcedureException(identifier);
     }
 
-    public boolean usePaimon(@Nullable String provider) {
+    public static boolean usePaimon(@Nullable String provider) {
         return provider == null || 
SparkSource.NAME().equalsIgnoreCase(provider);
     }
+
+    public static boolean isSystemNamespace(String[] namespace) {
+        return namespace.length == 1 && 
namespace[0].equalsIgnoreCase(SYSTEM_DATABASE_NAME);
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
index 575eb72711..ee8978c687 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
@@ -165,6 +165,9 @@ public class SparkCatalogWithRestTest {
                                 .get(0)
                                 .toString())
                 .isEqualTo("[3]");
+        assertThat(spark.sql("show user functions").collectAsList().toString())
+                .contains("[paimon.db2.area_func]");
+
         paimonCatalog.dropFunction(identifier, false);
         cleanFunction(functionName);
     }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
index b8fbcdae42..214965bf15 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
@@ -237,7 +237,7 @@ public class BucketFunctionTest {
         setupTable(bucketColumns);
         spark.sql(
                         String.format(
-                                "SELECT id_col, __paimon_bucket as 
expected_bucket, paimon.bucket(%s, %s) FROM %s",
+                                "SELECT id_col, __paimon_bucket as 
expected_bucket, paimon.sys.bucket(%s, %s) FROM %s",
                                 NUM_BUCKETS, String.join(",", bucketColumns), 
TABLE_NAME))
                 .collectAsList()
                 .forEach(row -> 
Assertions.assertThat(row.getInt(2)).isEqualTo(row.get(1)));
@@ -328,7 +328,7 @@ public class BucketFunctionTest {
         setupTable(TIMESTAMP_COL_PRECISION_3);
         spark.sql(
                         String.format(
-                                "SELECT id_col, __paimon_bucket as 
expected_bucket, paimon.bucket(%s, %s) FROM %s",
+                                "SELECT id_col, __paimon_bucket as 
expected_bucket, paimon.sys.bucket(%s, %s) FROM %s",
                                 NUM_BUCKETS,
                                 String.join(",", TIMESTAMP_COL_PRECISION_3),
                                 TABLE_NAME))
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/BucketFunctionBenchmark.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/BucketFunctionBenchmark.scala
deleted file mode 100644
index 1ba618c6a1..0000000000
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/BucketFunctionBenchmark.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.benchmark
-
-import org.apache.spark.sql.paimon.PaimonBenchmark
-
-object BucketFunctionBenchmark extends PaimonSqlBasedBenchmark {
-
-  private val N = 20L * 1000 * 1000
-
-  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
-    val benchmark = PaimonBenchmark(s"Bucket function", N, output = output)
-
-    benchmark.addCase("Single int column", 3) {
-      _ => spark.range(N).selectExpr("fixed_bucket(10, id)").noop()
-    }
-
-    benchmark.addCase("Single string column", 3) {
-      _ => spark.range(N).selectExpr("fixed_bucket(10, uuid())").noop()
-    }
-
-    benchmark.addCase("Multiple columns", 3) {
-      _ => spark.range(N).selectExpr("fixed_bucket(10, id, uuid(), 
uuid())").noop()
-    }
-
-    benchmark.run()
-  }
-}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
index b1263cd8d2..765eb136bb 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
@@ -69,7 +69,7 @@ class PaimonFunctionTest extends PaimonHiveTestBase {
     Seq("paimon", paimonHiveCatalogName).foreach {
       catalogName =>
         sql(s"use $catalogName")
-        val functions = sql("show user functions").collect()
+        val functions = sql("show user functions in sys").collect()
         assert(functions.exists(_.getString(0).contains("max_pt")), 
catalogName)
     }
   }
@@ -120,9 +120,9 @@ class PaimonFunctionTest extends PaimonHiveTestBase {
         {
           sql(s"use $catalogName")
           val maxPt = if (catalogName == sparkCatalogName) {
-            "paimon.max_pt"
+            "paimon.sys.max_pt"
           } else {
-            "max_pt"
+            "sys.max_pt"
           }
 
           intercept[Exception] {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
index 743b36f8dd..eec9a1acb7 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
@@ -106,7 +106,7 @@ abstract class PaimonV1FunctionTestBase extends 
PaimonSparkTestWithRestCatalogBa
         sql("INSERT INTO t VALUES (1, 2), (3, 4)")
         checkAnswer(
           sql(
-            "SELECT a, udf_add2(pow(a, pt), max_pt('t')), pow(a, udf_add2(a, 
pt)) FROM t ORDER BY a"),
+            "SELECT a, udf_add2(pow(a, pt), sys.max_pt('t')), pow(a, 
udf_add2(a, pt)) FROM t ORDER BY a"),
           Seq(Row(1, 5.0d, 1.0d), Row(3, 85.0d, 2187.0d))
         )
       }

Reply via email to