Copilot commented on code in PR #9580:
URL: https://github.com/apache/gravitino/pull/9580#discussion_r2776590466
##########
spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java:
##########
@@ -437,17 +444,72 @@ protected String getDatabase(Identifier sparkIdentifier) {
return getCatalogDefaultNamespace();
}
+ @Override
+ public Identifier[] listFunctions(String[] namespace) throws
NoSuchNamespaceException {
+ String gravitinoNamespace;
+ if (namespace.length == 0) {
+ gravitinoNamespace = getCatalogDefaultNamespace();
+ } else {
+ validateNamespace(namespace);
+ gravitinoNamespace = namespace[0];
+ }
+ try {
+ Function[] functions =
+ gravitinoCatalogClient
+ .asFunctionCatalog()
+ .listFunctionInfos(Namespace.of(gravitinoNamespace));
+ // Filter functions that have Spark runtime implementation
+ return Arrays.stream(functions)
+ .filter(this::hasSparkImplementation)
+ .map(f -> Identifier.of(new String[] {gravitinoNamespace}, f.name()))
+ .toArray(Identifier[]::new);
+ } catch (NoSuchSchemaException e) {
+ return new Identifier[0];
Review Comment:
`listFunctions` swallows `NoSuchSchemaException` and returns an empty array.
This is inconsistent with `listTables` (which translates
`NoSuchSchemaException` to Spark `NoSuchNamespaceException`) and can cause
`SHOW ... FUNCTIONS` against a nonexistent schema to succeed silently. Consider
throwing `NoSuchNamespaceException` for missing namespaces instead of returning
empty results.
```suggestion
throw new NoSuchNamespaceException(namespace);
```
##########
spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java:
##########
@@ -109,12 +110,33 @@ protected SparkTransformConverter
getSparkTransformConverter() {
@Override
public Identifier[] listFunctions(String[] namespace) throws
NoSuchNamespaceException {
- return ((SparkCatalog) sparkCatalog).listFunctions(namespace);
+ // Get functions from Iceberg catalog
+ Identifier[] icebergFunctions = ((SparkCatalog)
sparkCatalog).listFunctions(namespace);
+
+ // When the namespace is empty, to maintain compatibility with Iceberg
behavior, only Iceberg
+ // functions are returned.
+ Identifier[] gravitinoFunctions =
+ namespace.length == 0 ? new Identifier[0] :
super.listFunctions(namespace);
+
+ // Combine and return both sets of functions
+ Identifier[] allFunctions = new Identifier[icebergFunctions.length +
gravitinoFunctions.length];
+ System.arraycopy(icebergFunctions, 0, allFunctions, 0,
icebergFunctions.length);
+ System.arraycopy(
+ gravitinoFunctions, 0, allFunctions, icebergFunctions.length,
gravitinoFunctions.length);
+ return allFunctions;
Review Comment:
`listFunctions` concatenates Iceberg and Gravitino results without
de-duplicating. If a function name exists in both catalogs, Spark may see
duplicates/ambiguous entries. Consider de-duplicating (and defining precedence)
when merging the two Identifier arrays.
##########
spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java:
##########
@@ -470,6 +532,44 @@ private String getDatabase(NameIdentifier
gravitinoIdentifier) {
return gravitinoIdentifier.namespace().level(0);
}
+ private boolean hasSparkImplementation(Function function) {
+ for (FunctionDefinition definition : function.definitions()) {
+ for (FunctionImpl impl : definition.impls()) {
+ if (isSparkImplementation(impl)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean isSparkImplementation(FunctionImpl impl) {
+ return FunctionImpl.RuntimeType.SPARK.equals(impl.runtime());
+ }
+
+ private String extractClassName(FunctionImpl impl) {
+ if (impl instanceof JavaImpl) {
+ return ((JavaImpl) impl).className();
+ }
+ throw new IllegalArgumentException(
+ String.format("Unsupported function implementation %s",
impl.getClass().getName()));
+ }
+
+ private UnboundFunction instantiateFunction(String className, Identifier
ident)
+ throws NoSuchFunctionException {
+ try {
+ Class<?> functionClass = Class.forName(className);
+ Object instance = functionClass.getDeclaredConstructor().newInstance();
+ if (instance instanceof UnboundFunction) {
+ return (UnboundFunction) instance;
+ }
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(
+ String.format("Failed to instantiate function class: %s",
className), e);
+ }
Review Comment:
`instantiateFunction` uses `Class.forName(className)` without specifying a
classloader and wraps reflection failures in a generic `RuntimeException`. In
Spark (and elsewhere in this repo), dynamic loads typically use the thread
context classloader to work with isolated/plugin classloaders (see
`core/.../connector/BaseCatalog#loadCustomOps`). Consider loading via
`Thread.currentThread().getContextClassLoader()` (or Spark's classloader
utilities if available) and surfacing a Spark-friendly exception instead of a
bare `RuntimeException`.
##########
spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java:
##########
@@ -470,6 +532,44 @@ private String getDatabase(NameIdentifier
gravitinoIdentifier) {
return gravitinoIdentifier.namespace().level(0);
}
+ private boolean hasSparkImplementation(Function function) {
+ for (FunctionDefinition definition : function.definitions()) {
+ for (FunctionImpl impl : definition.impls()) {
+ if (isSparkImplementation(impl)) {
+ return true;
+ }
+ }
+ }
+ return false;
Review Comment:
`hasSparkImplementation` only checks `impl.runtime()==SPARK`. If a function
has a SPARK runtime impl that isn't a `JavaImpl` (or has a blank className), it
will be listed by `listFunctions` but later `loadFunction` will throw (via
`extractClassName`) or fail to instantiate. Consider tightening the filter to
only include loadable Spark implementations (e.g., `JavaImpl` with non-blank
className) or making `extractClassName`/`loadFunction` skip unsupported impls
instead of throwing.
##########
spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java:
##########
@@ -109,12 +110,33 @@ protected SparkTransformConverter
getSparkTransformConverter() {
@Override
public Identifier[] listFunctions(String[] namespace) throws
NoSuchNamespaceException {
- return ((SparkCatalog) sparkCatalog).listFunctions(namespace);
+ // Get functions from Iceberg catalog
+ Identifier[] icebergFunctions = ((SparkCatalog)
sparkCatalog).listFunctions(namespace);
+
+ // When the namespace is empty, to maintain compatibility with Iceberg
behavior, only Iceberg
+ // functions are returned.
+ Identifier[] gravitinoFunctions =
+ namespace.length == 0 ? new Identifier[0] :
super.listFunctions(namespace);
+
+ // Combine and return both sets of functions
+ Identifier[] allFunctions = new Identifier[icebergFunctions.length +
gravitinoFunctions.length];
+ System.arraycopy(icebergFunctions, 0, allFunctions, 0,
icebergFunctions.length);
+ System.arraycopy(
+ gravitinoFunctions, 0, allFunctions, icebergFunctions.length,
gravitinoFunctions.length);
+ return allFunctions;
}
@Override
public UnboundFunction loadFunction(Identifier ident) throws
NoSuchFunctionException {
- return ((SparkCatalog) sparkCatalog).loadFunction(ident);
+ try {
+ // When the namespace is empty, to maintain compatibility with Iceberg
behavior, only Iceberg
+ // functions are returned.
+ return ident.namespace().length == 0 &&
ArrayUtils.isEmpty(sparkCatalog.defaultNamespace())
+ ? ((SparkCatalog) sparkCatalog).loadFunction(ident)
+ : super.loadFunction(ident);
Review Comment:
`loadFunction` special-cases the empty namespace in `listFunctions` (returns
only Iceberg functions) but `loadFunction` may still attempt
`super.loadFunction` for unqualified identifiers depending on
`defaultNamespace()`. This can make unqualified function resolution differ from
listing behavior and from the stated compatibility comment. Consider making the
empty-namespace case consistently delegate directly to the underlying Iceberg
catalog, and only fall back to Gravitino functions when an explicit namespace
is provided.
```suggestion
String[] namespace = ident.namespace();
// When the namespace is empty, to maintain compatibility with Iceberg
behavior, functions
// should be loaded only from the underlying Iceberg catalog.
if (namespace == null || namespace.length == 0) {
return ((SparkCatalog) sparkCatalog).loadFunction(ident);
}
try {
return super.loadFunction(ident);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]