This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 07b0a15130 Spark 3.2, 3.3: SparkSessionCatalog should delegate load
function if Iceberg function is not found (#7153)
07b0a15130 is described below
commit 07b0a15130db1592fcd1c43c8d872e91c2901f22
Author: Bowen Liang <[email protected]>
AuthorDate: Sun Mar 26 09:15:58 2023 +0800
Spark 3.2, 3.3: SparkSessionCatalog should delegate load function if
Iceberg function is not found (#7153)
---
.../apache/iceberg/spark/SparkSessionCatalog.java | 15 +++++++++
.../iceberg/spark/TestSparkSessionCatalog.java | 36 ++++++++++++++++++----
.../apache/iceberg/spark/SparkSessionCatalog.java | 20 ++++++++++--
.../iceberg/spark/TestSparkSessionCatalog.java | 36 ++++++++++++++++++----
.../iceberg/spark/source/TestSparkCatalog.java | 3 +-
5 files changed, 94 insertions(+), 16 deletions(-)
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
index 68c7211311..c041df2ae3 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
@@ -27,11 +27,13 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.source.HasIcebergCatalog;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.connector.catalog.CatalogExtension;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.StagedTable;
@@ -40,6 +42,7 @@ import
org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -352,4 +355,16 @@ public class SparkSessionCatalog<T extends TableCatalog &
SupportsNamespaces> ex
"Cannot return underlying Iceberg Catalog, wrapped catalog does not
contain an Iceberg Catalog");
return ((HasIcebergCatalog) icebergCatalog).icebergCatalog();
}
+
+ @Override
+ public UnboundFunction loadFunction(Identifier ident) throws
NoSuchFunctionException {
+ try {
+ return super.loadFunction(ident);
+ } catch (NoSuchFunctionException e) {
+ if (getSessionCatalog() instanceof FunctionCatalog) {
+ return ((FunctionCatalog) getSessionCatalog()).loadFunction(ident);
+ }
+ }
+ throw new NoSuchFunctionException(ident);
+ }
}
diff --git
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
index 6737dc64ff..82a2fb4733 100644
---
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
+++
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
@@ -21,24 +21,33 @@ package org.apache.iceberg.spark;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestSparkSessionCatalog extends SparkTestBase {
- @Test
- public void testValidateHmsUri() {
- String envHmsUriKey = "spark.hadoop." + METASTOREURIS.varname;
- String catalogHmsUriKey = "spark.sql.catalog.spark_catalog.uri";
- String hmsUri = hiveConf.get(METASTOREURIS.varname);
+ private final String envHmsUriKey = "spark.hadoop." + METASTOREURIS.varname;
+ private final String catalogHmsUriKey =
"spark.sql.catalog.spark_catalog.uri";
+ private final String hmsUri = hiveConf.get(METASTOREURIS.varname);
+ @BeforeClass
+ public static void setUpCatalog() {
spark
.conf()
.set("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog");
spark.conf().set("spark.sql.catalog.spark_catalog.type", "hive");
+ }
- // HMS uris match
+ @Before
+ public void setupHmsUri() {
spark.sessionState().catalogManager().reset();
spark.conf().set(envHmsUriKey, hmsUri);
spark.conf().set(catalogHmsUriKey, hmsUri);
+ }
+
+ @Test
+ public void testValidateHmsUri() {
+ // HMS uris match
Assert.assertTrue(
spark
.sessionState()
@@ -86,4 +95,19 @@ public class TestSparkSessionCatalog extends SparkTestBase {
.defaultNamespace()[0]
.equals("default"));
}
+
+ @Test
+ public void testLoadFunction() {
+ String functionClass =
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper";
+
+ // load permanent UDF in Hive via FunctionCatalog
+ spark.sql(String.format("CREATE FUNCTION perm_upper AS '%s'",
functionClass));
+ Assert.assertEquals("Load permanent UDF in Hive", "XYZ", scalarSql("SELECT
perm_upper('xyz')"));
+
+ // load temporary UDF in Hive via FunctionCatalog
+ spark.sql(String.format("CREATE TEMPORARY FUNCTION temp_upper AS '%s'",
functionClass));
+ Assert.assertEquals("Load temporary UDF in Hive", "XYZ", scalarSql("SELECT
temp_upper('xyz')"));
+
+ // TODO: fix loading Iceberg built-in functions in SessionCatalog
+ }
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
index 444244e38b..c891985b38 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
@@ -27,12 +27,14 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.source.HasIcebergCatalog;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.connector.catalog.CatalogExtension;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.StagedTable;
@@ -41,6 +43,7 @@ import
org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -48,10 +51,11 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
* A Spark catalog that can also load non-Iceberg tables.
*
- * @param <T> CatalogPlugin class to avoid casting to TableCatalog and
SupportsNamespaces.
+ * @param <T> CatalogPlugin class to avoid casting to TableCatalog,
FunctionCatalog and
+ * SupportsNamespaces.
*/
-public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces>
extends BaseCatalog
- implements CatalogExtension {
+public class SparkSessionCatalog<T extends TableCatalog & FunctionCatalog &
SupportsNamespaces>
+ extends BaseCatalog implements CatalogExtension {
private static final String[] DEFAULT_NAMESPACE = new String[] {"default"};
private String catalogName = null;
@@ -336,6 +340,7 @@ public class SparkSessionCatalog<T extends TableCatalog &
SupportsNamespaces> ex
@SuppressWarnings("unchecked")
public void setDelegateCatalog(CatalogPlugin sparkSessionCatalog) {
if (sparkSessionCatalog instanceof TableCatalog
+ && sparkSessionCatalog instanceof FunctionCatalog
&& sparkSessionCatalog instanceof SupportsNamespaces) {
this.sessionCatalog = (T) sparkSessionCatalog;
} else {
@@ -377,4 +382,13 @@ public class SparkSessionCatalog<T extends TableCatalog &
SupportsNamespaces> ex
"Cannot return underlying Iceberg Catalog, wrapped catalog does not
contain an Iceberg Catalog");
return ((HasIcebergCatalog) icebergCatalog).icebergCatalog();
}
+
+ @Override
+ public UnboundFunction loadFunction(Identifier ident) throws
NoSuchFunctionException {
+ try {
+ return super.loadFunction(ident);
+ } catch (NoSuchFunctionException e) {
+ return getSessionCatalog().loadFunction(ident);
+ }
+ }
}
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
index 6737dc64ff..82a2fb4733 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
@@ -21,24 +21,33 @@ package org.apache.iceberg.spark;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestSparkSessionCatalog extends SparkTestBase {
- @Test
- public void testValidateHmsUri() {
- String envHmsUriKey = "spark.hadoop." + METASTOREURIS.varname;
- String catalogHmsUriKey = "spark.sql.catalog.spark_catalog.uri";
- String hmsUri = hiveConf.get(METASTOREURIS.varname);
+ private final String envHmsUriKey = "spark.hadoop." + METASTOREURIS.varname;
+ private final String catalogHmsUriKey =
"spark.sql.catalog.spark_catalog.uri";
+ private final String hmsUri = hiveConf.get(METASTOREURIS.varname);
+ @BeforeClass
+ public static void setUpCatalog() {
spark
.conf()
.set("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog");
spark.conf().set("spark.sql.catalog.spark_catalog.type", "hive");
+ }
- // HMS uris match
+ @Before
+ public void setupHmsUri() {
spark.sessionState().catalogManager().reset();
spark.conf().set(envHmsUriKey, hmsUri);
spark.conf().set(catalogHmsUriKey, hmsUri);
+ }
+
+ @Test
+ public void testValidateHmsUri() {
+ // HMS uris match
Assert.assertTrue(
spark
.sessionState()
@@ -86,4 +95,19 @@ public class TestSparkSessionCatalog extends SparkTestBase {
.defaultNamespace()[0]
.equals("default"));
}
+
+ @Test
+ public void testLoadFunction() {
+ String functionClass =
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper";
+
+ // load permanent UDF in Hive via FunctionCatalog
+ spark.sql(String.format("CREATE FUNCTION perm_upper AS '%s'",
functionClass));
+ Assert.assertEquals("Load permanent UDF in Hive", "XYZ", scalarSql("SELECT
perm_upper('xyz')"));
+
+ // load temporary UDF in Hive via FunctionCatalog
+ spark.sql(String.format("CREATE TEMPORARY FUNCTION temp_upper AS '%s'",
functionClass));
+ Assert.assertEquals("Load temporary UDF in Hive", "XYZ", scalarSql("SELECT
temp_upper('xyz')"));
+
+ // TODO: fix loading Iceberg built-in functions in SessionCatalog
+ }
}
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
index 31d5c38e2a..0c6cad7f36 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
@@ -26,12 +26,13 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
-public class TestSparkCatalog<T extends TableCatalog & SupportsNamespaces>
+public class TestSparkCatalog<T extends TableCatalog & FunctionCatalog &
SupportsNamespaces>
extends SparkSessionCatalog<T> {
private static final Map<Identifier, Table> tableMap = Maps.newHashMap();