diqiu50 commented on code in PR #10494:
URL: https://github.com/apache/gravitino/pull/10494#discussion_r3000138023
##########
trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java:
##########
@@ -679,4 +695,78 @@ private String getColumnName(
}
return internalMetadataColumnMetadata.getName();
}
+
+ @Override
+ public Collection<LanguageFunction> listLanguageFunctions(
+ ConnectorSession session, String schemaName) {
+ if (!catalogConnectorMetadata.supportsFunctions()) {
+ return List.of();
+ }
+ return
Arrays.stream(catalogConnectorMetadata.listFunctionInfos(schemaName))
+ .flatMap(function -> toLanguageFunctions(function).stream())
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Collection<LanguageFunction> getLanguageFunctions(
+ ConnectorSession session, SchemaFunctionName name) {
+ if (!catalogConnectorMetadata.supportsFunctions()) {
+ return List.of();
+ }
+ try {
+ Function function =
+ catalogConnectorMetadata.getFunction(name.getSchemaName(),
name.getFunctionName());
+ if (function == null) {
+ return List.of();
+ }
+ return toLanguageFunctions(function);
+ } catch (NoSuchFunctionException e) {
+ LOG.debug("Function {} not found in schema {}", name.getFunctionName(),
name.getSchemaName());
+ return List.of();
+ }
+ }
+
+ /**
+ * Converts a Gravitino function to a collection of Trino LanguageFunction
instances. Only SQL
+ * implementations with TRINO runtime are included. Each definition with a
Trino SQL
+ * implementation produces one LanguageFunction. The signature token is
generated from the
+ * function name and parameter types.
+ */
+ private Collection<LanguageFunction> toLanguageFunctions(Function function) {
+ List<LanguageFunction> result = new ArrayList<>();
+ for (FunctionDefinition definition : function.definitions()) {
+ for (FunctionImpl impl : definition.impls()) {
+ if (!isTrinoSqlImplementation(impl)) {
+ continue;
+ }
+ String sql = ((SQLImpl) impl).sql();
+ String signatureToken = buildSignatureToken(function.name(),
definition.parameters());
Review Comment:
We need to catch exceptions when building the signature token. If an
exception occurs, all functions in the schema cannot be used.
##########
trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoUDFIT.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector.integration.test;
+
+import static java.lang.Thread.sleep;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionCatalog;
+import org.apache.gravitino.function.FunctionDefinitions;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionImpls;
+import org.apache.gravitino.function.FunctionParams;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for Trino connector UDF adaptation. Verifies that
functions registered in
+ * Gravitino with TRINO runtime are visible via Trino's language function API.
+ */
+@Tag("gravitino-docker-test")
+public class TrinoUDFIT extends TrinoQueryITBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TrinoUDFIT.class);
+
+ private static final String CATALOG_NAME = "gt_hive_udf";
+ private static final String SCHEMA_NAME = "gt_udf_schema";
+ private static Catalog catalog;
+
+ @BeforeAll
+ public static void setUp() throws Exception {
+ TrinoUDFIT instance = new TrinoUDFIT();
+ instance.setup();
+
+ createHiveCatalog();
+ createSchema();
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ try {
+ cleanupFunctionsAndSchema();
+ dropCatalog(CATALOG_NAME);
+ } catch (Exception e) {
+ LOG.error("Error during teardown", e);
+ }
+ TrinoQueryITBase.cleanup();
+ }
+
+ private static void createHiveCatalog() throws Exception {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("metastore.uris", hiveMetastoreUri);
+
+ boolean exists = metalake.catalogExists(CATALOG_NAME);
+ if (!exists) {
+ metalake.createCatalog(
+ CATALOG_NAME, Catalog.Type.RELATIONAL, "hive", "UDF test catalog",
properties);
+ }
+
+ // Wait for catalog to sync to Trino
+ boolean catalogReady = false;
+ int tries = 180;
+ while (!catalogReady && tries-- >= 0) {
+ try {
+ String result = trinoQueryRunner.runQuery("show catalogs");
+ if (result.contains(metalakeName + "." + CATALOG_NAME)) {
+ catalogReady = true;
+ break;
+ }
+ } catch (Exception e) {
+ LOG.info("Waiting for catalog to sync to Trino");
+ }
+ sleep(1000);
+ }
+
+ if (!catalogReady) {
+ throw new Exception("Catalog " + CATALOG_NAME + " sync timeout");
+ }
+
+ catalog = metalake.loadCatalog(CATALOG_NAME);
+ }
+
+ private static void createSchema() {
+ boolean exists = catalog.asSchemas().schemaExists(SCHEMA_NAME);
+ if (!exists) {
+ catalog.asSchemas().createSchema(SCHEMA_NAME, "UDF test schema",
Collections.emptyMap());
+ }
+ }
+
+ private static void cleanupFunctionsAndSchema() {
+ try {
+ FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+ NameIdentifier[] functions =
functionCatalog.listFunctions(Namespace.of(SCHEMA_NAME));
+ for (NameIdentifier fn : functions) {
+ functionCatalog.dropFunction(NameIdentifier.of(SCHEMA_NAME,
fn.name()));
+ }
+ } catch (Exception e) {
+ LOG.error("Error cleaning up functions", e);
+ }
+
+ try {
+ catalog.asSchemas().dropSchema(SCHEMA_NAME, false);
+ } catch (Exception e) {
+ LOG.error("Error dropping schema", e);
+ }
+ }
+
+ @Test
+ public void testListLanguageFunctionsShowsRegisteredUDF() throws Exception {
+ String functionName = "test_add_one";
+ FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+
+ // Register a scalar function with TRINO runtime and SQL implementation
+ Function function =
+ functionCatalog.registerFunction(
+ NameIdentifier.of(SCHEMA_NAME, functionName),
+ "Adds one to input",
+ FunctionType.SCALAR,
+ true,
+ FunctionDefinitions.of(
+ FunctionDefinitions.of(
+ FunctionParams.of(FunctionParams.of("x",
Types.IntegerType.get())),
+ Types.IntegerType.get(),
+ FunctionImpls.of(
+ FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO,
"RETURN x + 1")))));
+ Assertions.assertNotNull(function);
Review Comment:
Add comments for this function definition
##########
trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java:
##########
@@ -396,4 +412,46 @@ public void setColumnType(SchemaTableName schemaTableName,
String columnName, Ty
String[] columnNames = {columnName};
applyAlter(schemaTableName, TableChange.updateColumnType(columnNames,
type));
}
+
+ /**
+ * Checks whether the catalog supports function operations.
+ *
+ * @return true if the catalog supports function operations, false otherwise
+ */
+ public boolean supportsFunctions() {
+ return functionCatalog != null;
+ }
+
+ /**
+ * Lists all functions with details in the specified schema.
+ *
+ * @param schemaName the name of the schema
+ * @return an array of functions, or an empty array if functions are not
supported
+ */
+ public Function[] listFunctionInfos(String schemaName) {
+ if (!supportsFunctions()) {
+ return new Function[0];
Review Comment:
Throwing an exception is better. If supportsFunctions is false, it should
never reach this point.
##########
trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoUDFIT.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector.integration.test;
+
+import static java.lang.Thread.sleep;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionCatalog;
+import org.apache.gravitino.function.FunctionDefinitions;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionImpls;
+import org.apache.gravitino.function.FunctionParams;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for Trino connector UDF adaptation. Verifies that
functions registered in
+ * Gravitino with TRINO runtime are visible via Trino's language function API.
+ */
+@Tag("gravitino-docker-test")
+public class TrinoUDFIT extends TrinoQueryITBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TrinoUDFIT.class);
+
+ private static final String CATALOG_NAME = "gt_hive_udf";
+ private static final String SCHEMA_NAME = "gt_udf_schema";
+ private static Catalog catalog;
+
+ @BeforeAll
+ public static void setUp() throws Exception {
+ TrinoUDFIT instance = new TrinoUDFIT();
+ instance.setup();
+
+ createHiveCatalog();
+ createSchema();
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ try {
+ cleanupFunctionsAndSchema();
+ dropCatalog(CATALOG_NAME);
+ } catch (Exception e) {
+ LOG.error("Error during teardown", e);
+ }
+ TrinoQueryITBase.cleanup();
+ }
+
+ private static void createHiveCatalog() throws Exception {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("metastore.uris", hiveMetastoreUri);
+
+ boolean exists = metalake.catalogExists(CATALOG_NAME);
+ if (!exists) {
+ metalake.createCatalog(
+ CATALOG_NAME, Catalog.Type.RELATIONAL, "hive", "UDF test catalog",
properties);
+ }
+
+ // Wait for catalog to sync to Trino
+ boolean catalogReady = false;
+ int tries = 180;
+ while (!catalogReady && tries-- >= 0) {
+ try {
+ String result = trinoQueryRunner.runQuery("show catalogs");
+ if (result.contains(metalakeName + "." + CATALOG_NAME)) {
+ catalogReady = true;
+ break;
+ }
+ } catch (Exception e) {
+ LOG.info("Waiting for catalog to sync to Trino");
+ }
+ sleep(1000);
+ }
+
+ if (!catalogReady) {
+ throw new Exception("Catalog " + CATALOG_NAME + " sync timeout");
+ }
+
+ catalog = metalake.loadCatalog(CATALOG_NAME);
+ }
+
+ private static void createSchema() {
+ boolean exists = catalog.asSchemas().schemaExists(SCHEMA_NAME);
+ if (!exists) {
+ catalog.asSchemas().createSchema(SCHEMA_NAME, "UDF test schema",
Collections.emptyMap());
+ }
+ }
+
+ private static void cleanupFunctionsAndSchema() {
+ try {
+ FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+ NameIdentifier[] functions =
functionCatalog.listFunctions(Namespace.of(SCHEMA_NAME));
+ for (NameIdentifier fn : functions) {
+ functionCatalog.dropFunction(NameIdentifier.of(SCHEMA_NAME,
fn.name()));
+ }
+ } catch (Exception e) {
+ LOG.error("Error cleaning up functions", e);
+ }
+
+ try {
+ catalog.asSchemas().dropSchema(SCHEMA_NAME, false);
+ } catch (Exception e) {
+ LOG.error("Error dropping schema", e);
+ }
+ }
+
+ @Test
+ public void testListLanguageFunctionsShowsRegisteredUDF() throws Exception {
+ String functionName = "test_add_one";
+ FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+
+ // Register a scalar function with TRINO runtime and SQL implementation
+ Function function =
+ functionCatalog.registerFunction(
+ NameIdentifier.of(SCHEMA_NAME, functionName),
+ "Adds one to input",
+ FunctionType.SCALAR,
+ true,
+ FunctionDefinitions.of(
+ FunctionDefinitions.of(
+ FunctionParams.of(FunctionParams.of("x",
Types.IntegerType.get())),
+ Types.IntegerType.get(),
+ FunctionImpls.of(
+ FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO,
"RETURN x + 1")))));
+ Assertions.assertNotNull(function);
+
+ // Query Trino to verify the function is listed
+ String trinoCatalogName = metalakeName + "." + CATALOG_NAME;
+ String showFunctionsQuery =
+ String.format("SHOW FUNCTIONS FROM %s.%s", trinoCatalogName,
SCHEMA_NAME);
+ String result = trinoQueryRunner.runQuery(showFunctionsQuery);
+
+ LOG.info("SHOW FUNCTIONS result: {}", result);
+ Assertions.assertTrue(
+ result.contains(functionName),
+ "Expected function " + functionName + " to be listed. Got: " + result);
+
+ // Cleanup
+ functionCatalog.dropFunction(NameIdentifier.of(SCHEMA_NAME, functionName));
+ }
+
+ @Test
+ public void testListLanguageFunctionsFiltersNonTrinoRuntime() throws
Exception {
+ String functionName = "spark_only_func";
+ FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+
+ // Register a function with SPARK runtime - should NOT appear in Trino
+ Function function =
+ functionCatalog.registerFunction(
+ NameIdentifier.of(SCHEMA_NAME, functionName),
+ "Spark-only function",
+ FunctionType.SCALAR,
+ true,
+ FunctionDefinitions.of(
+ FunctionDefinitions.of(
+ FunctionParams.of(FunctionParams.of("x",
Types.IntegerType.get())),
+ Types.IntegerType.get(),
+ FunctionImpls.of(
+ FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK,
"RETURN x + 1")))));
+ Assertions.assertNotNull(function);
+
+ // Query Trino - SPARK runtime function should be filtered out
+ String trinoCatalogName = metalakeName + "." + CATALOG_NAME;
+ String showFunctionsQuery =
+ String.format("SHOW FUNCTIONS FROM %s.%s", trinoCatalogName,
SCHEMA_NAME);
+ String result = trinoQueryRunner.runQuery(showFunctionsQuery);
+
Review Comment:
Can use add a test case of sql like " SELECT test_add_one(5)";
##########
trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java:
##########
@@ -679,4 +695,78 @@ private String getColumnName(
}
return internalMetadataColumnMetadata.getName();
}
+
+ @Override
+ public Collection<LanguageFunction> listLanguageFunctions(
+ ConnectorSession session, String schemaName) {
+ if (!catalogConnectorMetadata.supportsFunctions()) {
+ return List.of();
+ }
+ return
Arrays.stream(catalogConnectorMetadata.listFunctionInfos(schemaName))
+ .flatMap(function -> toLanguageFunctions(function).stream())
+ .collect(Collectors.toList());
Review Comment:
flatMap(XX).toList() is OK
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]