This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1fd1c18 [FLINK-14219][table] support ambiguous function reference
1fd1c18 is described below
commit 1fd1c18ce6ca73d7444645e2364c41867279f6c5
Author: bowen.li <[email protected]>
AuthorDate: Tue Oct 29 15:53:27 2019 -0700
[FLINK-14219][table] support ambiguous function reference
Support ambiguous function reference.
This closes #10052.
---
.../flink/table/catalog/FunctionCatalog.java | 72 ++++++----------
.../flink/table/catalog/FunctionCatalogTest.java | 98 ++++++++++++++++++++--
2 files changed, 117 insertions(+), 53 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 2b3d6dc..29208e6 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -250,51 +250,7 @@ public class FunctionCatalog implements FunctionLookup {
return
resolvePreciseFunctionReference(identifier.getIdentifier().get());
} else {
// ambiguous function reference
-
- String functionName = identifier.getSimpleName().get();
-
- FunctionDefinition userCandidate;
-
- Catalog catalog =
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
- try {
- CatalogFunction catalogFunction =
catalog.getFunction(
- new
ObjectPath(catalogManager.getCurrentDatabase(), functionName)
- );
-
- if
(catalog.getFunctionDefinitionFactory().isPresent()) {
- userCandidate =
catalog.getFunctionDefinitionFactory().get().createFunctionDefinition(functionName,
catalogFunction);
- } else {
- userCandidate =
FunctionDefinitionUtil.createFunctionDefinition(functionName, catalogFunction);
- }
-
- return Optional.of(
- new FunctionLookup.Result(
- FunctionIdentifier.of(
- ObjectIdentifier.of(
-
catalogManager.getCurrentCatalog(),
-
catalogManager.getCurrentDatabase(),
- functionName)),
- userCandidate)
- );
-
- } catch (FunctionNotExistException e) {
- // ignore
- }
-
- // If no corresponding function is found in catalog,
check in-memory functions
- userCandidate = tempSystemFunctions.get(functionName);
-
- final Optional<FunctionDefinition> foundDefinition;
- if (userCandidate != null) {
- foundDefinition = Optional.of(userCandidate);
- } else {
- foundDefinition =
moduleManager.getFunctionDefinition(functionName);
- }
-
- return foundDefinition.map(d -> new
FunctionLookup.Result(
-
FunctionIdentifier.of(identifier.getSimpleName().get()),
- d)
- );
+ return
resolveAmbiguousFunctionReference(identifier.getSimpleName().get());
}
}
@@ -339,7 +295,31 @@ public class FunctionCatalog implements FunctionLookup {
return Optional.empty();
}
- @Override
+ private Optional<FunctionLookup.Result>
resolveAmbiguousFunctionReference(String funcName) {
+ // resolve order:
+ // 1. Temporary system functions
+ // 2. System functions
+ // 3. Temporary catalog functions
+ // 4. Catalog functions
+
+ if (tempSystemFunctions.containsKey(funcName)) {
+ return Optional.of(
+ new
FunctionLookup.Result(FunctionIdentifier.of(funcName),
tempSystemFunctions.get(funcName))
+ );
+ }
+
+ Optional<FunctionDefinition> candidate =
moduleManager.getFunctionDefinition(funcName);
+ if (candidate.isPresent()) {
+ return Optional.of(
+ new
FunctionLookup.Result(FunctionIdentifier.of(funcName), candidate.get())
+ );
+ }
+
+ return resolvePreciseFunctionReference(
+ ObjectIdentifier.of(catalogManager.getCurrentCatalog(),
catalogManager.getCurrentDatabase(), funcName));
+ }
+
+ @Override
public PlannerTypeInferenceUtil getPlannerTypeInferenceUtil() {
Preconditions.checkNotNull(
plannerTypeInferenceUtil,
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
index 2461cdc..ee94c00 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
@@ -21,21 +21,24 @@ package org.apache.flink.table.catalog;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.ScalarFunctionDefinition;
+import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.module.exceptions.ModuleAlreadyExistException;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
@@ -44,13 +47,18 @@ import static org.junit.Assert.assertTrue;
public class FunctionCatalogTest {
private FunctionCatalog functionCatalog;
private Catalog catalog;
+ private ModuleManager moduleManager;
+
+ private final String testCatalogName = "test";
+
+ private static final String TEST_FUNCTION_NAME = "test_function";
@Before
public void init() throws DatabaseAlreadyExistException {
- catalog = new GenericInMemoryCatalog("test");
- catalog.createDatabase("test", new
CatalogDatabaseImpl(Collections.EMPTY_MAP, null), false);
+ catalog = new GenericInMemoryCatalog(testCatalogName);
+ moduleManager = new ModuleManager();
functionCatalog = new FunctionCatalog(
- new CatalogManager("test", catalog), new
ModuleManager());
+ new CatalogManager(testCatalogName, catalog),
moduleManager);
}
@Test
@@ -65,7 +73,7 @@ public class FunctionCatalogTest {
@Test
public void testPreciseFunctionReference() throws
FunctionAlreadyExistException, DatabaseNotExistException {
- ObjectIdentifier oi = ObjectIdentifier.of("test", "test",
"test_function");
+ ObjectIdentifier oi = ObjectIdentifier.of(testCatalogName,
GenericInMemoryCatalog.DEFAULT_DB, TEST_FUNCTION_NAME);
// test no function is found
assertFalse(functionCatalog.lookupFunction(FunctionIdentifier.of(oi)).isPresent());
@@ -79,7 +87,6 @@ public class FunctionCatalogTest {
assertFalse(result.getFunctionIdentifier().getSimpleName().isPresent());
assertEquals(oi,
result.getFunctionIdentifier().getIdentifier().get());
- assertNotNull(result.getFunctionDefinition());
assertTrue(((ScalarFunctionDefinition)
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction1);
// test temp catalog function is found
@@ -92,10 +99,73 @@ public class FunctionCatalogTest {
assertFalse(result.getFunctionIdentifier().getSimpleName().isPresent());
assertEquals(oi,
result.getFunctionIdentifier().getIdentifier().get());
- assertNotNull(result.getFunctionDefinition());
assertTrue(((ScalarFunctionDefinition)
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction2);
}
+ @Test
+ public void testAmbiguousFunctionReference() throws
FunctionAlreadyExistException, DatabaseNotExistException,
ModuleAlreadyExistException {
+ ObjectIdentifier oi = ObjectIdentifier.of(
+ testCatalogName,
+ GenericInMemoryCatalog.DEFAULT_DB,
+ TEST_FUNCTION_NAME);
+
+ // test no function is found
+
assertFalse(functionCatalog.lookupFunction(FunctionIdentifier.of(TEST_FUNCTION_NAME)).isPresent());
+
+ // test catalog function is found
+ catalog.createFunction(
+ oi.toObjectPath(),
+ new CatalogFunctionImpl(TestFunction1.class.getName(),
Collections.emptyMap()), false);
+
+ FunctionLookup.Result result =
functionCatalog.lookupFunction(FunctionIdentifier.of(TEST_FUNCTION_NAME)).get();
+
+
assertFalse(result.getFunctionIdentifier().getSimpleName().isPresent());
+ assertEquals(oi,
result.getFunctionIdentifier().getIdentifier().get());
+ assertTrue(((ScalarFunctionDefinition)
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction1);
+
+ // test temp catalog function is found
+ functionCatalog.registerTempCatalogScalarFunction(
+ oi,
+ new TestFunction2()
+ );
+
+ result =
functionCatalog.lookupFunction(FunctionIdentifier.of(TEST_FUNCTION_NAME)).get();
+
+
assertFalse(result.getFunctionIdentifier().getSimpleName().isPresent());
+ assertEquals(oi,
result.getFunctionIdentifier().getIdentifier().get());
+ assertTrue(((ScalarFunctionDefinition)
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction2);
+
+ // test system function is found
+ moduleManager.loadModule("test_module", new TestModule());
+
+ result =
functionCatalog.lookupFunction(FunctionIdentifier.of(TEST_FUNCTION_NAME)).get();
+
+ assertEquals(TEST_FUNCTION_NAME,
result.getFunctionIdentifier().getSimpleName().get());
+ assertTrue(((ScalarFunctionDefinition)
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction3);
+
+ // test temp system function is found
+
functionCatalog.registerTempSystemScalarFunction(TEST_FUNCTION_NAME, new
TestFunction4());
+
+ result =
functionCatalog.lookupFunction(FunctionIdentifier.of(TEST_FUNCTION_NAME)).get();
+
+ assertEquals(TEST_FUNCTION_NAME,
result.getFunctionIdentifier().getSimpleName().get());
+ assertTrue(((ScalarFunctionDefinition)
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction4);
+ }
+
+ private static class TestModule implements Module {
+ @Override
+ public Set<String> listFunctions() {
+ return new HashSet<String>() {{
+ add(TEST_FUNCTION_NAME);
+ }};
+ }
+
+ @Override
+ public Optional<FunctionDefinition>
getFunctionDefinition(String name) {
+ return Optional.of(new
ScalarFunctionDefinition(TEST_FUNCTION_NAME, new TestFunction3()));
+ }
+ }
+
/**
* Testing function.
*/
@@ -109,4 +179,18 @@ public class FunctionCatalogTest {
public static class TestFunction2 extends ScalarFunction {
}
+
+ /**
+ * Testing function.
+ */
+ public static class TestFunction3 extends ScalarFunction {
+
+ }
+
+ /**
+ * Testing function.
+ */
+ public static class TestFunction4 extends ScalarFunction {
+
+ }
}