This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fd1c18  [FLINK-14219][table] support ambiguous function reference
1fd1c18 is described below

commit 1fd1c18ce6ca73d7444645e2364c41867279f6c5
Author: bowen.li <[email protected]>
AuthorDate: Tue Oct 29 15:53:27 2019 -0700

    [FLINK-14219][table] support ambiguous function reference
    
    Support ambiguous function reference.
    
    This closes #10052.
---
 .../flink/table/catalog/FunctionCatalog.java       | 72 ++++++----------
 .../flink/table/catalog/FunctionCatalogTest.java   | 98 ++++++++++++++++++++--
 2 files changed, 117 insertions(+), 53 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 2b3d6dc..29208e6 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -250,51 +250,7 @@ public class FunctionCatalog implements FunctionLookup {
                        return 
resolvePreciseFunctionReference(identifier.getIdentifier().get());
                } else {
                        // ambiguous function reference
-
-                       String functionName = identifier.getSimpleName().get();
-
-                       FunctionDefinition userCandidate;
-
-                       Catalog catalog = 
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
-                       try {
-                               CatalogFunction catalogFunction = 
catalog.getFunction(
-                                       new 
ObjectPath(catalogManager.getCurrentDatabase(), functionName)
-                               );
-
-                               if 
(catalog.getFunctionDefinitionFactory().isPresent()) {
-                                       userCandidate = 
catalog.getFunctionDefinitionFactory().get().createFunctionDefinition(functionName,
 catalogFunction);
-                               } else {
-                                       userCandidate = 
FunctionDefinitionUtil.createFunctionDefinition(functionName, catalogFunction);
-                               }
-
-                               return Optional.of(
-                                       new FunctionLookup.Result(
-                                               FunctionIdentifier.of(
-                                                       ObjectIdentifier.of(
-                                                               
catalogManager.getCurrentCatalog(),
-                                                               
catalogManager.getCurrentDatabase(),
-                                                               functionName)),
-                                               userCandidate)
-                               );
-
-                       } catch (FunctionNotExistException e) {
-                               // ignore
-                       }
-
-                       // If no corresponding function is found in catalog, 
check in-memory functions
-                       userCandidate = tempSystemFunctions.get(functionName);
-
-                       final Optional<FunctionDefinition> foundDefinition;
-                       if (userCandidate != null) {
-                               foundDefinition = Optional.of(userCandidate);
-                       } else {
-                               foundDefinition = 
moduleManager.getFunctionDefinition(functionName);
-                       }
-
-                       return foundDefinition.map(d -> new 
FunctionLookup.Result(
-                               
FunctionIdentifier.of(identifier.getSimpleName().get()),
-                               d)
-                       );
+                       return 
resolveAmbiguousFunctionReference(identifier.getSimpleName().get());
                }
        }
 
@@ -339,7 +295,31 @@ public class FunctionCatalog implements FunctionLookup {
                return Optional.empty();
        }
 
-       @Override
+       private Optional<FunctionLookup.Result> 
resolveAmbiguousFunctionReference(String funcName) {
+               // resolve order:
+               // 1. Temporary system functions
+               // 2. System functions
+               // 3. Temporary catalog functions
+               // 4. Catalog functions
+
+               if (tempSystemFunctions.containsKey(funcName)) {
+                       return Optional.of(
+                               new 
FunctionLookup.Result(FunctionIdentifier.of(funcName), 
tempSystemFunctions.get(funcName))
+                       );
+               }
+
+               Optional<FunctionDefinition> candidate = 
moduleManager.getFunctionDefinition(funcName);
+               if (candidate.isPresent()) {
+                       return Optional.of(
+                               new 
FunctionLookup.Result(FunctionIdentifier.of(funcName), candidate.get())
+                       );
+               }
+
+               return resolvePreciseFunctionReference(
+                       ObjectIdentifier.of(catalogManager.getCurrentCatalog(), 
catalogManager.getCurrentDatabase(), funcName));
+       }
+
+               @Override
        public PlannerTypeInferenceUtil getPlannerTypeInferenceUtil() {
                Preconditions.checkNotNull(
                        plannerTypeInferenceUtil,
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
index 2461cdc..ee94c00 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
@@ -21,21 +21,24 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionIdentifier;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.ScalarFunctionDefinition;
+import org.apache.flink.table.module.Module;
 import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.module.exceptions.ModuleAlreadyExistException;
 
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -44,13 +47,18 @@ import static org.junit.Assert.assertTrue;
 public class FunctionCatalogTest {
        private FunctionCatalog functionCatalog;
        private Catalog catalog;
+       private ModuleManager moduleManager;
+
+       private final String testCatalogName = "test";
+
+       private static final String TEST_FUNCTION_NAME = "test_function";
 
        @Before
        public void init() throws DatabaseAlreadyExistException {
-               catalog = new GenericInMemoryCatalog("test");
-               catalog.createDatabase("test", new 
CatalogDatabaseImpl(Collections.EMPTY_MAP, null), false);
+               catalog = new GenericInMemoryCatalog(testCatalogName);
+               moduleManager = new ModuleManager();
                functionCatalog = new FunctionCatalog(
-                       new CatalogManager("test", catalog), new 
ModuleManager());
+                       new CatalogManager(testCatalogName, catalog), 
moduleManager);
        }
 
        @Test
@@ -65,7 +73,7 @@ public class FunctionCatalogTest {
 
        @Test
        public void testPreciseFunctionReference() throws 
FunctionAlreadyExistException, DatabaseNotExistException {
-               ObjectIdentifier oi = ObjectIdentifier.of("test", "test", 
"test_function");
+               ObjectIdentifier oi = ObjectIdentifier.of(testCatalogName, 
GenericInMemoryCatalog.DEFAULT_DB, TEST_FUNCTION_NAME);
 
                // test no function is found
                
assertFalse(functionCatalog.lookupFunction(FunctionIdentifier.of(oi)).isPresent());
@@ -79,7 +87,6 @@ public class FunctionCatalogTest {
 
                
assertFalse(result.getFunctionIdentifier().getSimpleName().isPresent());
                assertEquals(oi, 
result.getFunctionIdentifier().getIdentifier().get());
-               assertNotNull(result.getFunctionDefinition());
                assertTrue(((ScalarFunctionDefinition) 
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction1);
 
                // test temp catalog function is found
@@ -92,10 +99,73 @@ public class FunctionCatalogTest {
 
                
assertFalse(result.getFunctionIdentifier().getSimpleName().isPresent());
                assertEquals(oi, 
result.getFunctionIdentifier().getIdentifier().get());
-               assertNotNull(result.getFunctionDefinition());
                assertTrue(((ScalarFunctionDefinition) 
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction2);
        }
 
+       @Test
+       public void testAmbiguousFunctionReference() throws 
FunctionAlreadyExistException, DatabaseNotExistException, 
ModuleAlreadyExistException {
+               ObjectIdentifier oi = ObjectIdentifier.of(
+                       testCatalogName,
+                       GenericInMemoryCatalog.DEFAULT_DB,
+                       TEST_FUNCTION_NAME);
+
+               // test no function is found
+               
assertFalse(functionCatalog.lookupFunction(FunctionIdentifier.of(TEST_FUNCTION_NAME)).isPresent());
+
+               // test catalog function is found
+               catalog.createFunction(
+                       oi.toObjectPath(),
+                       new CatalogFunctionImpl(TestFunction1.class.getName(), 
Collections.emptyMap()), false);
+
+               FunctionLookup.Result result = 
functionCatalog.lookupFunction(FunctionIdentifier.of(TEST_FUNCTION_NAME)).get();
+
+               
assertFalse(result.getFunctionIdentifier().getSimpleName().isPresent());
+               assertEquals(oi, 
result.getFunctionIdentifier().getIdentifier().get());
+               assertTrue(((ScalarFunctionDefinition) 
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction1);
+
+               // test temp catalog function is found
+               functionCatalog.registerTempCatalogScalarFunction(
+                       oi,
+                       new TestFunction2()
+               );
+
+               result = 
functionCatalog.lookupFunction(FunctionIdentifier.of(TEST_FUNCTION_NAME)).get();
+
+               
assertFalse(result.getFunctionIdentifier().getSimpleName().isPresent());
+               assertEquals(oi, 
result.getFunctionIdentifier().getIdentifier().get());
+               assertTrue(((ScalarFunctionDefinition) 
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction2);
+
+               // test system function is found
+               moduleManager.loadModule("test_module", new TestModule());
+
+               result = 
functionCatalog.lookupFunction(FunctionIdentifier.of(TEST_FUNCTION_NAME)).get();
+
+               assertEquals(TEST_FUNCTION_NAME, 
result.getFunctionIdentifier().getSimpleName().get());
+               assertTrue(((ScalarFunctionDefinition) 
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction3);
+
+               // test temp system function is found
+               
functionCatalog.registerTempSystemScalarFunction(TEST_FUNCTION_NAME, new 
TestFunction4());
+
+               result = 
functionCatalog.lookupFunction(FunctionIdentifier.of(TEST_FUNCTION_NAME)).get();
+
+               assertEquals(TEST_FUNCTION_NAME, 
result.getFunctionIdentifier().getSimpleName().get());
+               assertTrue(((ScalarFunctionDefinition) 
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction4);
+       }
+
+       private static class TestModule implements Module {
+               @Override
+               public Set<String> listFunctions() {
+                       return new HashSet<String>() {{
+                               add(TEST_FUNCTION_NAME);
+                       }};
+               }
+
+               @Override
+               public Optional<FunctionDefinition> 
getFunctionDefinition(String name) {
+                       return Optional.of(new 
ScalarFunctionDefinition(TEST_FUNCTION_NAME, new TestFunction3()));
+               }
+       }
+
        /**
         * Testing function.
         */
@@ -109,4 +179,18 @@ public class FunctionCatalogTest {
        public static class TestFunction2 extends ScalarFunction {
 
        }
+
+       /**
+        * Testing function.
+        */
+       public static class TestFunction3 extends ScalarFunction {
+
+       }
+
+       /**
+        * Testing function.
+        */
+       public static class TestFunction4 extends ScalarFunction {
+
+       }
 }

Reply via email to