This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 05b0b61c624 [FLINK-33093] Support listing functions with unset catalog
(#23427)
05b0b61c624 is described below
commit 05b0b61c62434c73cd819750c0d56b1070a2b0f2
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Tue Sep 19 12:38:34 2023 +0200
[FLINK-33093] Support listing functions with unset catalog (#23427)
---
.../flink/table/catalog/FunctionCatalog.java | 36 ++++++++++++++--------
.../table/planner/catalog/UnknownCatalogTest.java | 16 ++++++++++
2 files changed, 40 insertions(+), 12 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 883450eeb93..5ba7f831102 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -47,6 +47,7 @@ import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@@ -305,21 +306,32 @@ public final class FunctionCatalog {
.map(FunctionIdentifier::of)
.collect(Collectors.toSet()));
- // add catalog functions
- Catalog catalog = catalogManager.getCatalog(catalogName).get();
+ // add catalog functions if catalog exists
+ catalogManager
+ .getCatalog(catalogName)
+ .ifPresent(
+ catalog ->
+ result.addAll(
+ getCatalogFunctions(catalog,
catalogName, databaseName)));
+
+ return result;
+ }
+
+ private static Collection<FunctionIdentifier> getCatalogFunctions(
+ Catalog catalog, String catalogName, String databaseName) {
try {
- catalog.listFunctions(databaseName)
- .forEach(
- name ->
- result.add(
- FunctionIdentifier.of(
- ObjectIdentifier.of(
- catalogName,
databaseName, name))));
+ return catalog.listFunctions(databaseName).stream()
+ .map(
+ name -> {
+ final ObjectIdentifier identifier =
+ ObjectIdentifier.of(catalogName,
databaseName, name);
+ return FunctionIdentifier.of(identifier);
+ })
+ .collect(Collectors.toList());
} catch (DatabaseNotExistException e) {
- // Ignore since there will always be a current database of the
current catalog
+ // if database does not exist, do not add catalog functions
+ return Collections.emptyList();
}
-
- return result;
}
/**
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java
index d28d4bea153..b7d1446ba40 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.Column;
@@ -30,11 +31,13 @@ import
org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.STRING;
@@ -90,6 +93,19 @@ public class UnknownCatalogTest {
assertThat(table.getResolvedSchema()).isEqualTo(CURRENT_TIMESTAMP_EXPECTED_SCHEMA);
}
+ @Test
+ public void testUnsetCatalogWithShowFunctions() throws Exception {
+ TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS);
+
+ tEnv.useCatalog(null);
+
+ TableResult table = tEnv.executeSql("SHOW FUNCTIONS");
+ final List<Row> functions =
CollectionUtil.iteratorToList(table.collect());
+
+ // check it has some built-in functions
+ assertThat(functions).hasSizeGreaterThan(0);
+ }
+
@Test
public void testUnsetCatalogWithFullyQualified() throws Exception {
TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS);