This is an automated email from the ASF dual-hosted git repository. jinsongzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push: new 44f41df76 [AMORO-3385][bug]fix SparkUnifiedSessionCatalog should support load hive function for hive/mixed-hive table (#3385) 44f41df76 is described below commit 44f41df76d058f946d3b248606ecc60975acc3d3 Author: Wang Tao <wtrunw...@gmail.com> AuthorDate: Mon Jan 20 20:18:24 2025 +0800 [AMORO-3385][bug]fix SparkUnifiedSessionCatalog should support load hive function for hive/mixed-hive table (#3385) fix: unifiedCatalog should support load hive function --- .../org/apache/amoro/spark/SessionCatalogBase.java | 57 ++++++++++++++++------ .../spark/SparkUnifiedSessionCatalogBase.java | 8 ++- .../amoro/spark/mixed/MixedSessionCatalogBase.java | 8 ++- .../spark/MixedFormatSparkSessionCatalog.java | 12 ++++- .../amoro/spark/SparkUnifiedSessionCatalog.java | 37 ++------------ 5 files changed, 65 insertions(+), 57 deletions(-) diff --git a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SessionCatalogBase.java b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SessionCatalogBase.java index 22921af9d..ee054c458 100644 --- a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SessionCatalogBase.java +++ b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SessionCatalogBase.java @@ -19,18 +19,10 @@ package org.apache.amoro.spark; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; -import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; -import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; -import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; -import org.apache.spark.sql.connector.catalog.CatalogExtension; -import org.apache.spark.sql.connector.catalog.CatalogPlugin; -import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.NamespaceChange; -import org.apache.spark.sql.connector.catalog.SupportsNamespaces; -import org.apache.spark.sql.connector.catalog.Table; -import org.apache.spark.sql.connector.catalog.TableCatalog; -import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.iceberg.spark.functions.SparkFunctions; +import org.apache.spark.sql.catalyst.analysis.*; +import org.apache.spark.sql.connector.catalog.*; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -38,8 +30,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap; import java.util.Map; /** Base class of spark session catalog. */ -public abstract class SessionCatalogBase<T extends TableCatalog & SupportsNamespaces> - implements SupportsNamespaces, CatalogExtension { +public abstract class SessionCatalogBase< + T extends TableCatalog & SupportsNamespaces & FunctionCatalog> + implements SupportsNamespaces, CatalogExtension, FunctionCatalog { private static final String[] DEFAULT_NAMESPACE = new String[] {"default"}; @@ -234,4 +227,40 @@ public abstract class SessionCatalogBase<T extends TableCatalog & SupportsNamesp getSessionCatalog().renameTable(from, to); } } + + private static boolean isSystemNamespace(String[] namespace) { + return namespace.length == 1 && namespace[0].equalsIgnoreCase("system"); + } + + @Override + public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { + String[] namespace = ident.namespace(); + String name = ident.name(); + + // Allow for empty namespace, as Spark's storage partitioned joins look up + // the corresponding functions to generate transforms for partitioning + // with an empty namespace, such as `bucket`. + // Otherwise, use `system` namespace. + if (namespace.length == 0 || isSystemNamespace(namespace)) { + UnboundFunction func = SparkFunctions.load(name); + if (func != null) { + return func; + } + } + + throw new NoSuchFunctionException(ident); + } + + @Override + public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException { + if (namespace.length == 0 || isSystemNamespace(namespace)) { + return SparkFunctions.list().stream() + .map(name -> Identifier.of(namespace, name)) + .toArray(Identifier[]::new); + } else if (namespaceExists(namespace)) { + return new Identifier[0]; + } + + throw new NoSuchNamespaceException(namespace); + } } diff --git a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java index a42db5c42..50f0ed0cc 100644 --- a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java +++ b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java @@ -21,10 +21,7 @@ package org.apache.amoro.spark; import org.apache.amoro.TableFormat; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; -import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.SupportsNamespaces; -import org.apache.spark.sql.connector.catalog.Table; -import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.*; import org.apache.spark.sql.connector.iceberg.catalog.Procedure; import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -32,7 +29,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap; import java.util.Map; import java.util.ServiceLoader; -public abstract class SparkUnifiedSessionCatalogBase<T extends TableCatalog & SupportsNamespaces> +public abstract class SparkUnifiedSessionCatalogBase< + T extends TableCatalog & SupportsNamespaces & FunctionCatalog> extends SessionCatalogBase<T> implements ProcedureCatalog { protected final Map<TableFormat, SparkTableFormat> tableFormats = Maps.newConcurrentMap(); diff --git a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSessionCatalogBase.java b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSessionCatalogBase.java index 3a42607d5..b9b13ee27 100644 --- a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSessionCatalogBase.java +++ b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSessionCatalogBase.java @@ -21,15 +21,13 @@ package org.apache.amoro.spark.mixed; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableSet; import org.apache.amoro.spark.SessionCatalogBase; import org.apache.commons.lang3.StringUtils; -import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.SupportsNamespaces; -import org.apache.spark.sql.connector.catalog.Table; -import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.*; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import java.util.Set; -public abstract class MixedSessionCatalogBase<T extends TableCatalog & SupportsNamespaces> +public abstract class MixedSessionCatalogBase< + T extends TableCatalog & SupportsNamespaces & FunctionCatalog> extends SessionCatalogBase<T> { /** Using {@link #MIXED_ICEBERG_PROVIDER} or {@link #MIXED_HIVE_PROVIDER} instead. */ diff --git a/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/MixedFormatSparkSessionCatalog.java b/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/MixedFormatSparkSessionCatalog.java index 19abfda8c..320b84574 100644 --- a/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/MixedFormatSparkSessionCatalog.java +++ b/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/MixedFormatSparkSessionCatalog.java @@ -20,8 +20,12 @@ package org.apache.amoro.spark; import org.apache.amoro.spark.mixed.MixedSessionCatalogBase; import org.apache.amoro.spark.mixed.MixedSparkCatalogBase; +import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; +import org.apache.spark.sql.connector.catalog.FunctionCatalog; +import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.SupportsNamespaces; import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.Procedure; import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** @@ -29,7 +33,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap; * * @param <T> CatalogPlugin class to avoid casting to TableCatalog and SupportsNamespaces. */ -public class MixedFormatSparkSessionCatalog<T extends TableCatalog & SupportsNamespaces> +public class MixedFormatSparkSessionCatalog< + T extends TableCatalog & SupportsNamespaces & FunctionCatalog> extends MixedSessionCatalogBase<T> { protected MixedSparkCatalogBase buildTargetCatalog( @@ -38,4 +43,9 @@ public class MixedFormatSparkSessionCatalog<T extends TableCatalog & SupportsNam newCatalog.initialize(name, options); return newCatalog; } + + @Override + public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException { + throw new NoSuchProcedureException(ident); + } } diff --git a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java index 96213a63f..c5578d567 100644 --- a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java +++ b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java @@ -19,7 +19,6 @@ package org.apache.amoro.spark; import org.apache.amoro.TableFormat; -import org.apache.iceberg.spark.functions.SparkFunctions; import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -68,38 +67,13 @@ public class SparkUnifiedSessionCatalog< } } - /** - * List the functions in a namespace from the catalog. - * - * <p>If there are no functions in the namespace, implementations should return an empty array. - * - * @param namespace a multi-part namespace - * @return an array of Identifiers for functions - * @throws NoSuchNamespaceException If the namespace does not exist (optional). - */ - @Override - public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException { - SparkUnifiedCatalog catalog = (SparkUnifiedCatalog) getTargetCatalog(); - return catalog.listFunctions(namespace); - } - @Override public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { - String[] namespace = ident.namespace(); - String name = ident.name(); - - // Allow for empty namespace, as Spark's storage partitioned joins look up - // the corresponding functions to generate transforms for partitioning - // with an empty namespace, such as `bucket`. - // Otherwise, use `system` namespace. - if (namespace.length == 0 || isSystemNamespace(namespace)) { - UnboundFunction func = SparkFunctions.load(name); - if (func != null) { - return func; - } + try { + return super.loadFunction(ident); + } catch (NoSuchFunctionException e) { + return getSessionCatalog().loadFunction(ident); } - - throw new NoSuchFunctionException(ident); } private static boolean isSystemNamespace(String[] namespace) { @@ -123,7 +97,6 @@ public class SparkUnifiedSessionCatalog< @Override public boolean dropNamespace(String[] namespace, boolean cascade) throws NoSuchNamespaceException, NonEmptyNamespaceException { - SparkUnifiedCatalog catalog = (SparkUnifiedCatalog) getTargetCatalog(); - return catalog.dropNamespace(namespace, cascade); + return getSessionCatalog().dropNamespace(namespace, cascade); } }