This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 44f41df76 [AMORO-3385][bug]fix SparkUnifiedSessionCatalog should 
support load hive function for hive/mixed-hive table (#3385)
44f41df76 is described below

commit 44f41df76d058f946d3b248606ecc60975acc3d3
Author: Wang Tao <wtrunw...@gmail.com>
AuthorDate: Mon Jan 20 20:18:24 2025 +0800

    [AMORO-3385][bug]fix SparkUnifiedSessionCatalog should support load hive 
function for hive/mixed-hive table (#3385)
    
    fix: unifiedCatalog should support load hive function
---
 .../org/apache/amoro/spark/SessionCatalogBase.java | 57 ++++++++++++++++------
 .../spark/SparkUnifiedSessionCatalogBase.java      |  8 ++-
 .../amoro/spark/mixed/MixedSessionCatalogBase.java |  8 ++-
 .../spark/MixedFormatSparkSessionCatalog.java      | 12 ++++-
 .../amoro/spark/SparkUnifiedSessionCatalog.java    | 37 ++------------
 5 files changed, 65 insertions(+), 57 deletions(-)

diff --git 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SessionCatalogBase.java
 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SessionCatalogBase.java
index 22921af9d..ee054c458 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SessionCatalogBase.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SessionCatalogBase.java
@@ -19,18 +19,10 @@
 package org.apache.amoro.spark;
 
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
-import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
-import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
-import org.apache.spark.sql.connector.catalog.CatalogExtension;
-import org.apache.spark.sql.connector.catalog.CatalogPlugin;
-import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.NamespaceChange;
-import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
-import org.apache.spark.sql.connector.catalog.Table;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.iceberg.spark.functions.SparkFunctions;
+import org.apache.spark.sql.catalyst.analysis.*;
+import org.apache.spark.sql.connector.catalog.*;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -38,8 +30,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import java.util.Map;
 
 /** Base class of spark session catalog. */
-public abstract class SessionCatalogBase<T extends TableCatalog & 
SupportsNamespaces>
-    implements SupportsNamespaces, CatalogExtension {
+public abstract class SessionCatalogBase<
+        T extends TableCatalog & SupportsNamespaces & FunctionCatalog>
+    implements SupportsNamespaces, CatalogExtension, FunctionCatalog {
 
   private static final String[] DEFAULT_NAMESPACE = new String[] {"default"};
 
@@ -234,4 +227,40 @@ public abstract class SessionCatalogBase<T extends 
TableCatalog & SupportsNamesp
       getSessionCatalog().renameTable(from, to);
     }
   }
+
+  private static boolean isSystemNamespace(String[] namespace) {
+    return namespace.length == 1 && namespace[0].equalsIgnoreCase("system");
+  }
+
+  @Override
+  public UnboundFunction loadFunction(Identifier ident) throws 
NoSuchFunctionException {
+    String[] namespace = ident.namespace();
+    String name = ident.name();
+
+    // Allow for empty namespace, as Spark's storage partitioned joins look up
+    // the corresponding functions to generate transforms for partitioning
+    // with an empty namespace, such as `bucket`.
+    // Otherwise, use `system` namespace.
+    if (namespace.length == 0 || isSystemNamespace(namespace)) {
+      UnboundFunction func = SparkFunctions.load(name);
+      if (func != null) {
+        return func;
+      }
+    }
+
+    throw new NoSuchFunctionException(ident);
+  }
+
+  @Override
+  public Identifier[] listFunctions(String[] namespace) throws 
NoSuchNamespaceException {
+    if (namespace.length == 0 || isSystemNamespace(namespace)) {
+      return SparkFunctions.list().stream()
+          .map(name -> Identifier.of(namespace, name))
+          .toArray(Identifier[]::new);
+    } else if (namespaceExists(namespace)) {
+      return new Identifier[0];
+    }
+
+    throw new NoSuchNamespaceException(namespace);
+  }
 }
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java
 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java
index a42db5c42..50f0ed0cc 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java
@@ -21,10 +21,7 @@ package org.apache.amoro.spark;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
-import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
-import org.apache.spark.sql.connector.catalog.Table;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.*;
 import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
 import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -32,7 +29,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import java.util.Map;
 import java.util.ServiceLoader;
 
-public abstract class SparkUnifiedSessionCatalogBase<T extends TableCatalog & 
SupportsNamespaces>
+public abstract class SparkUnifiedSessionCatalogBase<
+        T extends TableCatalog & SupportsNamespaces & FunctionCatalog>
     extends SessionCatalogBase<T> implements ProcedureCatalog {
 
   protected final Map<TableFormat, SparkTableFormat> tableFormats = 
Maps.newConcurrentMap();
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSessionCatalogBase.java
 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSessionCatalogBase.java
index 3a42607d5..b9b13ee27 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSessionCatalogBase.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSessionCatalogBase.java
@@ -21,15 +21,13 @@ package org.apache.amoro.spark.mixed;
 import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableSet;
 import org.apache.amoro.spark.SessionCatalogBase;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
-import org.apache.spark.sql.connector.catalog.Table;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.*;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 import java.util.Set;
 
-public abstract class MixedSessionCatalogBase<T extends TableCatalog & 
SupportsNamespaces>
+public abstract class MixedSessionCatalogBase<
+        T extends TableCatalog & SupportsNamespaces & FunctionCatalog>
     extends SessionCatalogBase<T> {
 
   /** Using {@link #MIXED_ICEBERG_PROVIDER} or {@link #MIXED_HIVE_PROVIDER} 
instead. */
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/MixedFormatSparkSessionCatalog.java
 
b/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/MixedFormatSparkSessionCatalog.java
index 19abfda8c..320b84574 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/MixedFormatSparkSessionCatalog.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/MixedFormatSparkSessionCatalog.java
@@ -20,8 +20,12 @@ package org.apache.amoro.spark;
 
 import org.apache.amoro.spark.mixed.MixedSessionCatalogBase;
 import org.apache.amoro.spark.mixed.MixedSparkCatalogBase;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 /**
@@ -29,7 +33,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
  *
  * @param <T> CatalogPlugin class to avoid casting to TableCatalog and 
SupportsNamespaces.
  */
-public class MixedFormatSparkSessionCatalog<T extends TableCatalog & 
SupportsNamespaces>
+public class MixedFormatSparkSessionCatalog<
+        T extends TableCatalog & SupportsNamespaces & FunctionCatalog>
     extends MixedSessionCatalogBase<T> {
 
   protected MixedSparkCatalogBase buildTargetCatalog(
@@ -38,4 +43,9 @@ public class MixedFormatSparkSessionCatalog<T extends 
TableCatalog & SupportsNam
     newCatalog.initialize(name, options);
     return newCatalog;
   }
+
+  @Override
+  public Procedure loadProcedure(Identifier ident) throws 
NoSuchProcedureException {
+    throw new NoSuchProcedureException(ident);
+  }
 }
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
index 96213a63f..c5578d567 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
@@ -19,7 +19,6 @@
 package org.apache.amoro.spark;
 
 import org.apache.amoro.TableFormat;
-import org.apache.iceberg.spark.functions.SparkFunctions;
 import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -68,38 +67,13 @@ public class SparkUnifiedSessionCatalog<
     }
   }
 
-  /**
-   * List the functions in a namespace from the catalog.
-   *
-   * <p>If there are no functions in the namespace, implementations should 
return an empty array.
-   *
-   * @param namespace a multi-part namespace
-   * @return an array of Identifiers for functions
-   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional).
-   */
-  @Override
-  public Identifier[] listFunctions(String[] namespace) throws 
NoSuchNamespaceException {
-    SparkUnifiedCatalog catalog = (SparkUnifiedCatalog) getTargetCatalog();
-    return catalog.listFunctions(namespace);
-  }
-
   @Override
   public UnboundFunction loadFunction(Identifier ident) throws 
NoSuchFunctionException {
-    String[] namespace = ident.namespace();
-    String name = ident.name();
-
-    // Allow for empty namespace, as Spark's storage partitioned joins look up
-    // the corresponding functions to generate transforms for partitioning
-    // with an empty namespace, such as `bucket`.
-    // Otherwise, use `system` namespace.
-    if (namespace.length == 0 || isSystemNamespace(namespace)) {
-      UnboundFunction func = SparkFunctions.load(name);
-      if (func != null) {
-        return func;
-      }
+    try {
+      return super.loadFunction(ident);
+    } catch (NoSuchFunctionException e) {
+      return getSessionCatalog().loadFunction(ident);
     }
-
-    throw new NoSuchFunctionException(ident);
   }
 
   private static boolean isSystemNamespace(String[] namespace) {
@@ -123,7 +97,6 @@ public class SparkUnifiedSessionCatalog<
   @Override
   public boolean dropNamespace(String[] namespace, boolean cascade)
       throws NoSuchNamespaceException, NonEmptyNamespaceException {
-    SparkUnifiedCatalog catalog = (SparkUnifiedCatalog) getTargetCatalog();
-    return catalog.dropNamespace(namespace, cascade);
+    return getSessionCatalog().dropNamespace(namespace, cascade);
   }
 }

Reply via email to