This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 44f41df76 [AMORO-3385][bug]fix SparkUnifiedSessionCatalog should
support load hive function for hive/mixed-hive table (#3385)
44f41df76 is described below
commit 44f41df76d058f946d3b248606ecc60975acc3d3
Author: Wang Tao <[email protected]>
AuthorDate: Mon Jan 20 20:18:24 2025 +0800
[AMORO-3385][bug]fix SparkUnifiedSessionCatalog should support load hive
function for hive/mixed-hive table (#3385)
fix: unifiedCatalog should support load hive function
---
.../org/apache/amoro/spark/SessionCatalogBase.java | 57 ++++++++++++++++------
.../spark/SparkUnifiedSessionCatalogBase.java | 8 ++-
.../amoro/spark/mixed/MixedSessionCatalogBase.java | 8 ++-
.../spark/MixedFormatSparkSessionCatalog.java | 12 ++++-
.../amoro/spark/SparkUnifiedSessionCatalog.java | 37 ++------------
5 files changed, 65 insertions(+), 57 deletions(-)
diff --git
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SessionCatalogBase.java
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SessionCatalogBase.java
index 22921af9d..ee054c458 100644
---
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SessionCatalogBase.java
+++
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SessionCatalogBase.java
@@ -19,18 +19,10 @@
package org.apache.amoro.spark;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
-import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
-import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
-import org.apache.spark.sql.connector.catalog.CatalogExtension;
-import org.apache.spark.sql.connector.catalog.CatalogPlugin;
-import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.NamespaceChange;
-import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
-import org.apache.spark.sql.connector.catalog.Table;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.iceberg.spark.functions.SparkFunctions;
+import org.apache.spark.sql.catalyst.analysis.*;
+import org.apache.spark.sql.connector.catalog.*;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -38,8 +30,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import java.util.Map;
/** Base class of spark session catalog. */
-public abstract class SessionCatalogBase<T extends TableCatalog &
SupportsNamespaces>
- implements SupportsNamespaces, CatalogExtension {
+public abstract class SessionCatalogBase<
+ T extends TableCatalog & SupportsNamespaces & FunctionCatalog>
+ implements SupportsNamespaces, CatalogExtension, FunctionCatalog {
private static final String[] DEFAULT_NAMESPACE = new String[] {"default"};
@@ -234,4 +227,40 @@ public abstract class SessionCatalogBase<T extends
TableCatalog & SupportsNamesp
getSessionCatalog().renameTable(from, to);
}
}
+
+ private static boolean isSystemNamespace(String[] namespace) {
+ return namespace.length == 1 && namespace[0].equalsIgnoreCase("system");
+ }
+
+ @Override
+ public UnboundFunction loadFunction(Identifier ident) throws
NoSuchFunctionException {
+ String[] namespace = ident.namespace();
+ String name = ident.name();
+
+ // Allow for empty namespace, as Spark's storage partitioned joins look up
+ // the corresponding functions to generate transforms for partitioning
+ // with an empty namespace, such as `bucket`.
+ // Otherwise, use `system` namespace.
+ if (namespace.length == 0 || isSystemNamespace(namespace)) {
+ UnboundFunction func = SparkFunctions.load(name);
+ if (func != null) {
+ return func;
+ }
+ }
+
+ throw new NoSuchFunctionException(ident);
+ }
+
+ @Override
+ public Identifier[] listFunctions(String[] namespace) throws
NoSuchNamespaceException {
+ if (namespace.length == 0 || isSystemNamespace(namespace)) {
+ return SparkFunctions.list().stream()
+ .map(name -> Identifier.of(namespace, name))
+ .toArray(Identifier[]::new);
+ } else if (namespaceExists(namespace)) {
+ return new Identifier[0];
+ }
+
+ throw new NoSuchNamespaceException(namespace);
+ }
}
diff --git
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java
index a42db5c42..50f0ed0cc 100644
---
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java
+++
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java
@@ -21,10 +21,7 @@ package org.apache.amoro.spark;
import org.apache.amoro.TableFormat;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
-import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
-import org.apache.spark.sql.connector.catalog.Table;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.*;
import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -32,7 +29,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import java.util.Map;
import java.util.ServiceLoader;
-public abstract class SparkUnifiedSessionCatalogBase<T extends TableCatalog &
SupportsNamespaces>
+public abstract class SparkUnifiedSessionCatalogBase<
+ T extends TableCatalog & SupportsNamespaces & FunctionCatalog>
extends SessionCatalogBase<T> implements ProcedureCatalog {
protected final Map<TableFormat, SparkTableFormat> tableFormats =
Maps.newConcurrentMap();
diff --git
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSessionCatalogBase.java
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSessionCatalogBase.java
index 3a42607d5..b9b13ee27 100644
---
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSessionCatalogBase.java
+++
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSessionCatalogBase.java
@@ -21,15 +21,13 @@ package org.apache.amoro.spark.mixed;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableSet;
import org.apache.amoro.spark.SessionCatalogBase;
import org.apache.commons.lang3.StringUtils;
-import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
-import org.apache.spark.sql.connector.catalog.Table;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.*;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import java.util.Set;
-public abstract class MixedSessionCatalogBase<T extends TableCatalog &
SupportsNamespaces>
+public abstract class MixedSessionCatalogBase<
+ T extends TableCatalog & SupportsNamespaces & FunctionCatalog>
extends SessionCatalogBase<T> {
/** Using {@link #MIXED_ICEBERG_PROVIDER} or {@link #MIXED_HIVE_PROVIDER}
instead. */
diff --git
a/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/MixedFormatSparkSessionCatalog.java
b/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/MixedFormatSparkSessionCatalog.java
index 19abfda8c..320b84574 100644
---
a/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/MixedFormatSparkSessionCatalog.java
+++
b/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/MixedFormatSparkSessionCatalog.java
@@ -20,8 +20,12 @@ package org.apache.amoro.spark;
import org.apache.amoro.spark.mixed.MixedSessionCatalogBase;
import org.apache.amoro.spark.mixed.MixedSparkCatalogBase;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
@@ -29,7 +33,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
*
* @param <T> CatalogPlugin class to avoid casting to TableCatalog and
SupportsNamespaces.
*/
-public class MixedFormatSparkSessionCatalog<T extends TableCatalog &
SupportsNamespaces>
+public class MixedFormatSparkSessionCatalog<
+ T extends TableCatalog & SupportsNamespaces & FunctionCatalog>
extends MixedSessionCatalogBase<T> {
protected MixedSparkCatalogBase buildTargetCatalog(
@@ -38,4 +43,9 @@ public class MixedFormatSparkSessionCatalog<T extends
TableCatalog & SupportsNam
newCatalog.initialize(name, options);
return newCatalog;
}
+
+ @Override
+ public Procedure loadProcedure(Identifier ident) throws
NoSuchProcedureException {
+ throw new NoSuchProcedureException(ident);
+ }
}
diff --git
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
index 96213a63f..c5578d567 100644
---
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
+++
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
@@ -19,7 +19,6 @@
package org.apache.amoro.spark;
import org.apache.amoro.TableFormat;
-import org.apache.iceberg.spark.functions.SparkFunctions;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -68,38 +67,13 @@ public class SparkUnifiedSessionCatalog<
}
}
- /**
- * List the functions in a namespace from the catalog.
- *
- * <p>If there are no functions in the namespace, implementations should
return an empty array.
- *
- * @param namespace a multi-part namespace
- * @return an array of Identifiers for functions
- * @throws NoSuchNamespaceException If the namespace does not exist
(optional).
- */
- @Override
- public Identifier[] listFunctions(String[] namespace) throws
NoSuchNamespaceException {
- SparkUnifiedCatalog catalog = (SparkUnifiedCatalog) getTargetCatalog();
- return catalog.listFunctions(namespace);
- }
-
@Override
public UnboundFunction loadFunction(Identifier ident) throws
NoSuchFunctionException {
- String[] namespace = ident.namespace();
- String name = ident.name();
-
- // Allow for empty namespace, as Spark's storage partitioned joins look up
- // the corresponding functions to generate transforms for partitioning
- // with an empty namespace, such as `bucket`.
- // Otherwise, use `system` namespace.
- if (namespace.length == 0 || isSystemNamespace(namespace)) {
- UnboundFunction func = SparkFunctions.load(name);
- if (func != null) {
- return func;
- }
+ try {
+ return super.loadFunction(ident);
+ } catch (NoSuchFunctionException e) {
+ return getSessionCatalog().loadFunction(ident);
}
-
- throw new NoSuchFunctionException(ident);
}
private static boolean isSystemNamespace(String[] namespace) {
@@ -123,7 +97,6 @@ public class SparkUnifiedSessionCatalog<
@Override
public boolean dropNamespace(String[] namespace, boolean cascade)
throws NoSuchNamespaceException, NonEmptyNamespaceException {
- SparkUnifiedCatalog catalog = (SparkUnifiedCatalog) getTargetCatalog();
- return catalog.dropNamespace(namespace, cascade);
+ return getSessionCatalog().dropNamespace(namespace, cascade);
}
}