This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 07b0a15130 Spark 3.2, 3.3: SparkSessionCatalog should delegate load 
function if Iceberg function is not found (#7153)
07b0a15130 is described below

commit 07b0a15130db1592fcd1c43c8d872e91c2901f22
Author: Bowen Liang <[email protected]>
AuthorDate: Sun Mar 26 09:15:58 2023 +0800

    Spark 3.2, 3.3: SparkSessionCatalog should delegate load function if 
Iceberg function is not found (#7153)
---
 .../apache/iceberg/spark/SparkSessionCatalog.java  | 15 +++++++++
 .../iceberg/spark/TestSparkSessionCatalog.java     | 36 ++++++++++++++++++----
 .../apache/iceberg/spark/SparkSessionCatalog.java  | 20 ++++++++++--
 .../iceberg/spark/TestSparkSessionCatalog.java     | 36 ++++++++++++++++++----
 .../iceberg/spark/source/TestSparkCatalog.java     |  3 +-
 5 files changed, 94 insertions(+), 16 deletions(-)

diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
index 68c7211311..c041df2ae3 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
@@ -27,11 +27,13 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.source.HasIcebergCatalog;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
 import org.apache.spark.sql.connector.catalog.CatalogExtension;
 import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
 import org.apache.spark.sql.connector.catalog.StagedTable;
@@ -40,6 +42,7 @@ import 
org.apache.spark.sql.connector.catalog.SupportsNamespaces;
 import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -352,4 +355,16 @@ public class SparkSessionCatalog<T extends TableCatalog & 
SupportsNamespaces> ex
         "Cannot return underlying Iceberg Catalog, wrapped catalog does not 
contain an Iceberg Catalog");
     return ((HasIcebergCatalog) icebergCatalog).icebergCatalog();
   }
+
+  @Override
+  public UnboundFunction loadFunction(Identifier ident) throws 
NoSuchFunctionException {
+    try {
+      return super.loadFunction(ident);
+    } catch (NoSuchFunctionException e) {
+      if (getSessionCatalog() instanceof FunctionCatalog) {
+        return ((FunctionCatalog) getSessionCatalog()).loadFunction(ident);
+      }
+    }
+    throw new NoSuchFunctionException(ident);
+  }
 }
diff --git 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
index 6737dc64ff..82a2fb4733 100644
--- 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
+++ 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
@@ -21,24 +21,33 @@ package org.apache.iceberg.spark;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
 
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestSparkSessionCatalog extends SparkTestBase {
-  @Test
-  public void testValidateHmsUri() {
-    String envHmsUriKey = "spark.hadoop." + METASTOREURIS.varname;
-    String catalogHmsUriKey = "spark.sql.catalog.spark_catalog.uri";
-    String hmsUri = hiveConf.get(METASTOREURIS.varname);
+  private final String envHmsUriKey = "spark.hadoop." + METASTOREURIS.varname;
+  private final String catalogHmsUriKey = 
"spark.sql.catalog.spark_catalog.uri";
+  private final String hmsUri = hiveConf.get(METASTOREURIS.varname);
 
+  @BeforeClass
+  public static void setUpCatalog() {
     spark
         .conf()
         .set("spark.sql.catalog.spark_catalog", 
"org.apache.iceberg.spark.SparkSessionCatalog");
     spark.conf().set("spark.sql.catalog.spark_catalog.type", "hive");
+  }
 
-    // HMS uris match
+  @Before
+  public void setupHmsUri() {
     spark.sessionState().catalogManager().reset();
     spark.conf().set(envHmsUriKey, hmsUri);
     spark.conf().set(catalogHmsUriKey, hmsUri);
+  }
+
+  @Test
+  public void testValidateHmsUri() {
+    // HMS uris match
     Assert.assertTrue(
         spark
             .sessionState()
@@ -86,4 +95,19 @@ public class TestSparkSessionCatalog extends SparkTestBase {
             .defaultNamespace()[0]
             .equals("default"));
   }
+
+  @Test
+  public void testLoadFunction() {
+    String functionClass = 
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper";
+
+    // load permanent UDF in Hive via FunctionCatalog
+    spark.sql(String.format("CREATE FUNCTION perm_upper AS '%s'", 
functionClass));
+    Assert.assertEquals("Load permanent UDF in Hive", "XYZ", scalarSql("SELECT 
perm_upper('xyz')"));
+
+    // load temporary UDF in Hive via FunctionCatalog
+    spark.sql(String.format("CREATE TEMPORARY FUNCTION temp_upper AS '%s'", 
functionClass));
+    Assert.assertEquals("Load temporary UDF in Hive", "XYZ", scalarSql("SELECT 
temp_upper('xyz')"));
+
+    // TODO: fix loading Iceberg built-in functions in SessionCatalog
+  }
 }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
index 444244e38b..c891985b38 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
@@ -27,12 +27,14 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.source.HasIcebergCatalog;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
 import org.apache.spark.sql.connector.catalog.CatalogExtension;
 import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
 import org.apache.spark.sql.connector.catalog.StagedTable;
@@ -41,6 +43,7 @@ import 
org.apache.spark.sql.connector.catalog.SupportsNamespaces;
 import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -48,10 +51,11 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 /**
  * A Spark catalog that can also load non-Iceberg tables.
  *
- * @param <T> CatalogPlugin class to avoid casting to TableCatalog and 
SupportsNamespaces.
+ * @param <T> CatalogPlugin class to avoid casting to TableCatalog, 
FunctionCatalog and
+ *     SupportsNamespaces.
  */
-public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces> 
extends BaseCatalog
-    implements CatalogExtension {
+public class SparkSessionCatalog<T extends TableCatalog & FunctionCatalog & 
SupportsNamespaces>
+    extends BaseCatalog implements CatalogExtension {
   private static final String[] DEFAULT_NAMESPACE = new String[] {"default"};
 
   private String catalogName = null;
@@ -336,6 +340,7 @@ public class SparkSessionCatalog<T extends TableCatalog & 
SupportsNamespaces> ex
   @SuppressWarnings("unchecked")
   public void setDelegateCatalog(CatalogPlugin sparkSessionCatalog) {
     if (sparkSessionCatalog instanceof TableCatalog
+        && sparkSessionCatalog instanceof FunctionCatalog
         && sparkSessionCatalog instanceof SupportsNamespaces) {
       this.sessionCatalog = (T) sparkSessionCatalog;
     } else {
@@ -377,4 +382,13 @@ public class SparkSessionCatalog<T extends TableCatalog & 
SupportsNamespaces> ex
         "Cannot return underlying Iceberg Catalog, wrapped catalog does not 
contain an Iceberg Catalog");
     return ((HasIcebergCatalog) icebergCatalog).icebergCatalog();
   }
+
+  @Override
+  public UnboundFunction loadFunction(Identifier ident) throws 
NoSuchFunctionException {
+    try {
+      return super.loadFunction(ident);
+    } catch (NoSuchFunctionException e) {
+      return getSessionCatalog().loadFunction(ident);
+    }
+  }
 }
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
index 6737dc64ff..82a2fb4733 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
@@ -21,24 +21,33 @@ package org.apache.iceberg.spark;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
 
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestSparkSessionCatalog extends SparkTestBase {
-  @Test
-  public void testValidateHmsUri() {
-    String envHmsUriKey = "spark.hadoop." + METASTOREURIS.varname;
-    String catalogHmsUriKey = "spark.sql.catalog.spark_catalog.uri";
-    String hmsUri = hiveConf.get(METASTOREURIS.varname);
+  private final String envHmsUriKey = "spark.hadoop." + METASTOREURIS.varname;
+  private final String catalogHmsUriKey = 
"spark.sql.catalog.spark_catalog.uri";
+  private final String hmsUri = hiveConf.get(METASTOREURIS.varname);
 
+  @BeforeClass
+  public static void setUpCatalog() {
     spark
         .conf()
         .set("spark.sql.catalog.spark_catalog", 
"org.apache.iceberg.spark.SparkSessionCatalog");
     spark.conf().set("spark.sql.catalog.spark_catalog.type", "hive");
+  }
 
-    // HMS uris match
+  @Before
+  public void setupHmsUri() {
     spark.sessionState().catalogManager().reset();
     spark.conf().set(envHmsUriKey, hmsUri);
     spark.conf().set(catalogHmsUriKey, hmsUri);
+  }
+
+  @Test
+  public void testValidateHmsUri() {
+    // HMS uris match
     Assert.assertTrue(
         spark
             .sessionState()
@@ -86,4 +95,19 @@ public class TestSparkSessionCatalog extends SparkTestBase {
             .defaultNamespace()[0]
             .equals("default"));
   }
+
+  @Test
+  public void testLoadFunction() {
+    String functionClass = 
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper";
+
+    // load permanent UDF in Hive via FunctionCatalog
+    spark.sql(String.format("CREATE FUNCTION perm_upper AS '%s'", 
functionClass));
+    Assert.assertEquals("Load permanent UDF in Hive", "XYZ", scalarSql("SELECT 
perm_upper('xyz')"));
+
+    // load temporary UDF in Hive via FunctionCatalog
+    spark.sql(String.format("CREATE TEMPORARY FUNCTION temp_upper AS '%s'", 
functionClass));
+    Assert.assertEquals("Load temporary UDF in Hive", "XYZ", scalarSql("SELECT 
temp_upper('xyz')"));
+
+    // TODO: fix loading Iceberg built-in functions in SessionCatalog
+  }
 }
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
index 31d5c38e2a..0c6cad7f36 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
@@ -26,12 +26,13 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkSessionCatalog;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
 import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 
-public class TestSparkCatalog<T extends TableCatalog & SupportsNamespaces>
+public class TestSparkCatalog<T extends TableCatalog & FunctionCatalog & 
SupportsNamespaces>
     extends SparkSessionCatalog<T> {
 
   private static final Map<Identifier, Table> tableMap = Maps.newHashMap();

Reply via email to