This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 40a0e71  [FLINK-17880][table] Use new inference for table/scalar 
function in catalogs
40a0e71 is described below

commit 40a0e7187067e2d56b3ac65958e010243af36fb7
Author: Timo Walther <[email protected]>
AuthorDate: Mon May 25 11:06:06 2020 +0200

    [FLINK-17880][table] Use new inference for table/scalar function in catalogs
    
    This updates catalog related operations (e.g. function DDL, SQL Client) to
    the new type inference for scalar and table functions. Hive functions and
    legacy planner still work with the old inference. With this commit we can
    tell an easier story about how to implement functions, because all main
    entrypoints support the new inference.
    
    This closes #12336.
---
 .../client/config/entries/ExecutionEntry.java      |   9 +
 .../client/gateway/local/ExecutionContext.java     |  30 ++-
 .../api/bridge/java/StreamTableEnvironment.java    |   7 +
 .../flink/table/catalog/FunctionCatalog.java       |  10 +-
 .../table/functions/FunctionDefinitionUtil.java    |  17 +-
 .../flink/table/catalog/FunctionCatalogTest.java   | 203 +++++++++------------
 .../functions/FunctionDefinitionUtilTest.java      |   8 +-
 .../functions/python/PythonScalarFunction.java     |  20 ++
 .../functions/python/PythonTableFunction.java      |  20 ++
 .../plan/nodes/common/CommonPythonBase.scala       |  12 +-
 .../table/planner/plan/utils/PythonUtil.scala      |  10 +-
 .../utils/JavaUserDefinedScalarFunctions.java      |  11 +-
 .../catalog/FunctionCatalogOperatorTable.java      |  30 +++
 .../expressions/PlannerExpressionConverter.scala   |  58 +++++-
 .../table/runtime/stream/sql/FunctionITCase.java   |  19 --
 .../table/runtime/stream/table/FunctionITCase.java | 103 -----------
 .../flink/table/runtime/batch/sql/CalcITCase.scala |   9 -
 17 files changed, 286 insertions(+), 290 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
index 78be7f5..fc94dd1 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
@@ -221,6 +221,15 @@ public class ExecutionEntry extends ConfigEntry {
                return false;
        }
 
+       public boolean isBlinkPlanner() {
+               final String planner = 
properties.getOptionalString(EXECUTION_PLANNER)
+                       .orElse(EXECUTION_PLANNER_VALUE_BLINK);
+               if (planner.equals(EXECUTION_PLANNER_VALUE_OLD)) {
+                       return false;
+               }
+               return true;
+       }
+
        public TimeCharacteristic getTimeCharacteristic() {
                return 
properties.getOptionalString(EXECUTION_TIME_CHARACTERISTIC)
                        .flatMap((v) -> {
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 4f5e418..a13eab7 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -655,14 +655,28 @@ public class ExecutionContext<ClusterID> {
                if (tableEnv instanceof StreamTableEnvironment) {
                        StreamTableEnvironment streamTableEnvironment = 
(StreamTableEnvironment) tableEnv;
                        functions.forEach((k, v) -> {
-                               if (v instanceof ScalarFunction) {
-                                       
streamTableEnvironment.registerFunction(k, (ScalarFunction) v);
-                               } else if (v instanceof AggregateFunction) {
-                                       
streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
-                               } else if (v instanceof TableFunction) {
-                                       
streamTableEnvironment.registerFunction(k, (TableFunction<?>) v);
-                               } else {
-                                       throw new 
SqlExecutionException("Unsupported function type: " + v.getClass().getName());
+                               // Blink planner uses FLIP-65 functions for 
scalar and table functions
+                               // aggregate functions still use the old type 
inference
+                               if 
(environment.getExecution().isBlinkPlanner()) {
+                                       if (v instanceof ScalarFunction || v 
instanceof TableFunction) {
+                                               
streamTableEnvironment.createTemporarySystemFunction(k, (UserDefinedFunction) 
v);
+                                       } else if (v instanceof 
AggregateFunction) {
+                                               
streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
+                                       } else {
+                                               throw new 
SqlExecutionException("Unsupported function type: " + v.getClass().getName());
+                                       }
+                               }
+                               // legacy
+                               else {
+                                       if (v instanceof ScalarFunction) {
+                                               
streamTableEnvironment.registerFunction(k, (ScalarFunction) v);
+                                       } else if (v instanceof 
AggregateFunction) {
+                                               
streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
+                                       } else if (v instanceof TableFunction) {
+                                               
streamTableEnvironment.registerFunction(k, (TableFunction<?>) v);
+                                       } else {
+                                               throw new 
SqlExecutionException("Unsupported function type: " + v.getClass().getName());
+                                       }
                                }
                        });
                } else {
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
index b9d611e..6f80220 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
 
 /**
  * This table environment is the entry point and central context for creating 
Table and SQL
@@ -152,7 +153,13 @@ public interface StreamTableEnvironment extends 
TableEnvironment {
         * @param name The name under which the function is registered.
         * @param tableFunction The TableFunction to register.
         * @param <T> The type of the output row.
+        *
+        * @deprecated Use {@link #createTemporarySystemFunction(String, 
UserDefinedFunction)} instead. Please
+        *             note that the new method also uses the new type system 
and reflective extraction logic. It
+        *             might be necessary to update the function implementation 
as well. See the documentation of
+        *             {@link TableFunction} for more information on the new 
function design.
         */
+       @Deprecated
        <T> void registerFunction(String name, TableFunction<T> tableFunction);
 
        /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index a6ab333..38c4094 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -388,6 +388,10 @@ public final class FunctionCatalog {
                );
        }
 
+       /**
+        * @deprecated Use {@link #registerTemporarySystemFunction(String, 
FunctionDefinition, boolean)} instead.
+        */
+       @Deprecated
        public <T> void registerTempSystemTableFunction(
                        String name,
                        TableFunction<T> function,
@@ -639,9 +643,9 @@ public final class FunctionCatalog {
                        // directly.
                        return ((InlineCatalogFunction) 
function).getDefinition();
                }
-               // Currently the uninstantiated functions are all from sql and 
catalog that use the old type inference,
-               // so using FunctionDefinitionUtil to instantiate them and wrap 
them with `ScalarFunctionDefinition`,
-               // `TableFunctionDefinition`, etc. If the new type inference is 
fully functional, this should be
+               // Until all functions support the new type inference, 
uninstantiated functions from sql and
+               // catalog use the FunctionDefinitionUtil to instantiate them 
and wrap them with `AggregateFunctionDefinition`,
+               // `TableAggregateFunctionDefinition`. If the new type 
inference is fully functional, this should be
                // changed to use 
`UserDefinedFunctionHelper#instantiateFunction`.
                return FunctionDefinitionUtil.createFunctionDefinition(
                        name, function.getClassName(), 
function.getFunctionLanguage(), config);
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
index 1df8830..de5d91d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
@@ -60,18 +60,11 @@ public class FunctionDefinitionUtil {
        }
 
        private static FunctionDefinition 
createFunctionDefinitionInternal(String name, UserDefinedFunction udf) {
-               if (udf instanceof ScalarFunction) {
-                       return new ScalarFunctionDefinition(
-                               name,
-                               (ScalarFunction) udf
-                       );
-               } else if (udf instanceof TableFunction) {
-                       TableFunction t = (TableFunction) udf;
-                       return new TableFunctionDefinition(
-                               name,
-                               t,
-                               
UserDefinedFunctionHelper.getReturnTypeOfTableFunction(t)
-                       );
+               if (udf instanceof ScalarFunction || udf instanceof 
TableFunction) {
+                       // table and scalar function use the new type inference
+                       // once the other functions have been updated, this 
entire class will not be necessary
+                       // anymore and can be replaced with 
UserDefinedFunctionHelper.instantiateFunction
+                       return udf;
                } else if (udf instanceof AggregateFunction) {
                        AggregateFunction a = (AggregateFunction) udf;
 
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
index 4f68d4c..d1654db 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
@@ -26,9 +26,7 @@ import 
org.apache.flink.table.functions.AggregateFunctionDefinition;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionIdentifier;
 import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.ScalarFunctionDefinition;
 import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.module.Module;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.utils.CatalogManagerMocks;
@@ -37,7 +35,6 @@ import org.hamcrest.Matcher;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Optional;
@@ -47,7 +44,6 @@ import static 
org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG;
 import static 
org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_DATABASE;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -59,24 +55,31 @@ import static 
org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
  */
 public class FunctionCatalogTest {
 
-       // TODO for now the resolution still returns the marker interfaces for 
functions
-       //  until we drop the old function stack in DDL
-
        private static final ScalarFunction FUNCTION_1 = new TestFunction1();
 
        private static final ScalarFunction FUNCTION_2 = new TestFunction2();
 
+       private static final ScalarFunction FUNCTION_3 = new TestFunction3();
+
+       private static final ScalarFunction FUNCTION_4 = new TestFunction4();
+
        private static final ScalarFunction FUNCTION_INVALID = new 
InvalidTestFunction();
 
+       private static final TableFunction<?> TABLE_FUNCTION = new 
TestTableFunction();
+
+       private static final AggregateFunction<?, ?> AGGREGATE_FUNCTION = new 
TestAggregateFunction();
+
        private static final String NAME = "test_function";
 
        private static final ObjectIdentifier IDENTIFIER = ObjectIdentifier.of(
                DEFAULT_CATALOG,
-               DEFAULT_DATABASE, NAME);
+               DEFAULT_DATABASE,
+               NAME);
 
        private static final UnresolvedIdentifier FULL_UNRESOLVED_IDENTIFIER = 
UnresolvedIdentifier.of(
                DEFAULT_CATALOG,
-               DEFAULT_DATABASE, NAME);
+               DEFAULT_DATABASE,
+               NAME);
 
        private static final UnresolvedIdentifier PARTIAL_UNRESOLVED_IDENTIFIER 
= UnresolvedIdentifier.of(NAME);
 
@@ -120,24 +123,23 @@ public class FunctionCatalogTest {
                // test catalog function is found
                catalog.createFunction(
                        IDENTIFIER.toObjectPath(),
-                       new CatalogFunctionImpl(TestFunction1.class.getName()),
+                       new 
CatalogFunctionImpl(FUNCTION_1.getClass().getName()),
                        false);
 
-               FunctionLookup.Result result = 
functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER).get();
-
-               assertEquals(Optional.of(IDENTIFIER), 
result.getFunctionIdentifier().getIdentifier());
-               assertTrue(((ScalarFunctionDefinition) 
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction1);
+               assertThat(
+                       
functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
+                       returnsFunction(FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_1));
 
                // test temp catalog function is found
-               functionCatalog.registerTempCatalogScalarFunction(
-                       IDENTIFIER,
-                       new TestFunction2()
+               functionCatalog.registerTemporaryCatalogFunction(
+                       PARTIAL_UNRESOLVED_IDENTIFIER,
+                       FUNCTION_2,
+                       false
                );
 
-               result = 
functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER).get();
-
-               assertEquals(Optional.of(IDENTIFIER), 
result.getFunctionIdentifier().getIdentifier());
-               assertTrue(((ScalarFunctionDefinition) 
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction2);
+               assertThat(
+                       
functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
+                       returnsFunction(FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_2));
        }
 
        @Test
@@ -148,40 +150,37 @@ public class FunctionCatalogTest {
                // test catalog function is found
                catalog.createFunction(
                        IDENTIFIER.toObjectPath(),
-                       new CatalogFunctionImpl(TestFunction1.class.getName()),
+                       new 
CatalogFunctionImpl(FUNCTION_1.getClass().getName()),
                        false);
 
-               FunctionLookup.Result result = 
functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER).get();
-
-               assertEquals(Optional.of(IDENTIFIER), 
result.getFunctionIdentifier().getIdentifier());
-               assertTrue(((ScalarFunctionDefinition) 
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction1);
+               assertThat(
+                       
functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
+                       returnsFunction(FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_1));
 
-               // test temp catalog function is found
-               functionCatalog.registerTempCatalogScalarFunction(
-                       IDENTIFIER,
-                       new TestFunction2()
+               // test temporary catalog function is found
+               functionCatalog.registerTemporaryCatalogFunction(
+                       PARTIAL_UNRESOLVED_IDENTIFIER,
+                       FUNCTION_2,
+                       false
                );
 
-               result = 
functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER).get();
-
-               assertEquals(Optional.of(IDENTIFIER), 
result.getFunctionIdentifier().getIdentifier());
-               assertTrue(((ScalarFunctionDefinition) 
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction2);
+               assertThat(
+                       
functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
+                       returnsFunction(FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_2));
 
                // test system function is found
                moduleManager.loadModule("test_module", new TestModule());
 
-               result = 
functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER).get();
-
-               assertEquals(Optional.of(NAME), 
result.getFunctionIdentifier().getSimpleName());
-               assertTrue(((ScalarFunctionDefinition) 
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction3);
-
-               // test temp system function is found
-               functionCatalog.registerTempSystemScalarFunction(NAME, new 
TestFunction4());
+               assertThat(
+                       
functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
+                       returnsFunction(FunctionIdentifier.of(NAME), 
FUNCTION_3));
 
-               result = 
functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER).get();
+               // test temporary system function is found
+               functionCatalog.registerTemporarySystemFunction(NAME, 
FUNCTION_4, false);
 
-               assertEquals(Optional.of(NAME), 
result.getFunctionIdentifier().getSimpleName());
-               assertTrue(((ScalarFunctionDefinition) 
result.getFunctionDefinition()).getScalarFunction() instanceof TestFunction4);
+               assertThat(
+                       
functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
+                       returnsFunction(FunctionIdentifier.of(NAME), 
FUNCTION_4));
        }
 
        @Test
@@ -196,7 +195,8 @@ public class FunctionCatalogTest {
                        returnsFunction(FunctionIdentifier.of(NAME), 
FUNCTION_1));
 
                // register second time lenient
-               functionCatalog.registerTemporarySystemFunction(NAME,
+               functionCatalog.registerTemporarySystemFunction(
+                       NAME,
                        FUNCTION_2,
                        true);
                assertThat(
@@ -254,27 +254,27 @@ public class FunctionCatalogTest {
                // register first time
                functionCatalog.registerTemporarySystemFunction(
                        NAME,
-                       TestFunction1.class.getName(),
+                       FUNCTION_1.getClass().getName(),
                        FunctionLanguage.JAVA,
                        false);
                assertThat(
                        
functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
-                       returnsFunction(FunctionIdentifier.of(NAME), new 
ScalarFunctionDefinition(NAME, FUNCTION_1)));
+                       returnsFunction(FunctionIdentifier.of(NAME), 
FUNCTION_1));
 
                // register second time lenient
                functionCatalog.registerTemporarySystemFunction(NAME,
-                       TestFunction2.class.getName(),
+                       FUNCTION_2.getClass().getName(),
                        FunctionLanguage.JAVA,
                        true);
                assertThat(
                        
functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
-                       returnsFunction(FunctionIdentifier.of(NAME), new 
ScalarFunctionDefinition(NAME, FUNCTION_1)));
+                       returnsFunction(FunctionIdentifier.of(NAME), 
FUNCTION_1));
 
                // register second time not lenient
                try {
                        functionCatalog.registerTemporarySystemFunction(
                                NAME,
-                               TestFunction2.class.getName(),
+                               FUNCTION_2.getClass().getName(),
                                FunctionLanguage.JAVA,
                                false);
                        fail();
@@ -288,7 +288,7 @@ public class FunctionCatalogTest {
                try {
                        functionCatalog.registerTemporarySystemFunction(
                                NAME,
-                               InvalidTestFunction.class.getName(),
+                               FUNCTION_INVALID.getClass().getName(),
                                FunctionLanguage.JAVA,
                                false);
                        fail();
@@ -305,28 +305,29 @@ public class FunctionCatalogTest {
                // test register uninstantiated table function
                functionCatalog.registerTemporarySystemFunction(
                        NAME,
-                       TestTableFunction1.class.getName(),
+                       TABLE_FUNCTION.getClass().getName(),
                        FunctionLanguage.JAVA,
                        false);
                assertThat(
                        
functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
                        returnsFunction(
                                FunctionIdentifier.of(NAME),
-                               new TableFunctionDefinition(NAME, new 
TestTableFunction1(), Types.STRING)));
+                               TABLE_FUNCTION));
 
                functionCatalog.dropTemporarySystemFunction(NAME, true);
 
                // test register uninstantiated aggregate function
                functionCatalog.registerTemporarySystemFunction(
                        NAME,
-                       TestAggregateFunction1.class.getName(),
+                       AGGREGATE_FUNCTION.getClass().getName(),
                        FunctionLanguage.JAVA,
                        false);
                assertThat(
                        
functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
                        returnsFunction(
                                FunctionIdentifier.of(NAME),
-                               new AggregateFunctionDefinition(NAME, new 
TestAggregateFunction1(), Types.STRING, Types.STRING)));
+                               // TODO aggregate functions still use marker 
interface
+                               new AggregateFunctionDefinition(NAME, 
AGGREGATE_FUNCTION, Types.STRING, Types.STRING)));
        }
 
        @Test
@@ -338,9 +339,7 @@ public class FunctionCatalogTest {
                        false);
                assertThat(
                        
functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
-                       returnsFunction(
-                               FunctionIdentifier.of(IDENTIFIER),
-                               new ScalarFunctionDefinition(NAME, 
FUNCTION_1)));
+                       returnsFunction(FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_1));
 
                // register second time lenient
                functionCatalog.registerCatalogFunction(
@@ -349,9 +348,7 @@ public class FunctionCatalogTest {
                        true);
                assertThat(
                        
functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
-                       returnsFunction(
-                               FunctionIdentifier.of(IDENTIFIER),
-                               new ScalarFunctionDefinition(NAME, 
FUNCTION_1)));
+                       returnsFunction(FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_1));
 
                // register second time not lenient
                try {
@@ -414,9 +411,7 @@ public class FunctionCatalogTest {
                        false);
                assertThat(
                        
functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
-                       returnsFunction(
-                               FunctionIdentifier.of(IDENTIFIER),
-                               new ScalarFunctionDefinition(NAME, 
FUNCTION_2)));
+                       returnsFunction(FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_2));
 
                // register temporary first time
                functionCatalog.registerTemporaryCatalogFunction(
@@ -488,7 +483,7 @@ public class FunctionCatalogTest {
                        
functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
                        returnsFunction(
                                FunctionIdentifier.of(IDENTIFIER),
-                               new ScalarFunctionDefinition(NAME, 
FUNCTION_2))); // permanent function is visible again
+                               FUNCTION_2)); // permanent function is visible 
again
 
                // drop temporary second time lenient
                assertThat(
@@ -530,9 +525,7 @@ public class FunctionCatalogTest {
                        false);
                assertThat(
                        
functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
-                       returnsFunction(
-                               FunctionIdentifier.of(IDENTIFIER),
-                               new ScalarFunctionDefinition(NAME, 
FUNCTION_2)));
+                       returnsFunction(FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_2));
 
                // register temporary first time
                functionCatalog.registerTemporaryCatalogFunction(
@@ -542,9 +535,7 @@ public class FunctionCatalogTest {
                // temporary function hides catalog function
                assertThat(
                        
functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
-                       returnsFunction(
-                               FunctionIdentifier.of(IDENTIFIER),
-                               new 
ScalarFunctionDefinition(IDENTIFIER.getObjectName(), FUNCTION_1)));
+                       returnsFunction(FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_1));
 
                // register temporary second time lenient
                functionCatalog.registerTemporaryCatalogFunction(
@@ -553,9 +544,7 @@ public class FunctionCatalogTest {
                        true);
                assertThat(
                        
functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER),
-                       returnsFunction(
-                               FunctionIdentifier.of(IDENTIFIER),
-                               new 
ScalarFunctionDefinition(IDENTIFIER.getObjectName(), FUNCTION_1)));
+                       returnsFunction(FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_1));
 
                // register temporary second time not lenient
                try {
@@ -590,58 +579,25 @@ public class FunctionCatalogTest {
                // test register uninstantiated table function
                functionCatalog.registerTemporaryCatalogFunction(
                        PARTIAL_UNRESOLVED_IDENTIFIER,
-                       new 
CatalogFunctionImpl(TestTableFunction1.class.getName()),
+                       new 
CatalogFunctionImpl(TABLE_FUNCTION.getClass().getName()),
                        false);
                assertThat(
                        
functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
-                       returnsFunction(
-                               FunctionIdentifier.of(IDENTIFIER),
-                               new TableFunctionDefinition(NAME, new 
TestTableFunction1(), Types.STRING)));
+                       returnsFunction(FunctionIdentifier.of(IDENTIFIER), 
TABLE_FUNCTION));
 
                
functionCatalog.dropTemporaryCatalogFunction(PARTIAL_UNRESOLVED_IDENTIFIER, 
true);
 
                // test register uninstantiated aggregate function
                functionCatalog.registerTemporaryCatalogFunction(
                        PARTIAL_UNRESOLVED_IDENTIFIER,
-                       new 
CatalogFunctionImpl(TestAggregateFunction1.class.getName()),
+                       new 
CatalogFunctionImpl(AGGREGATE_FUNCTION.getClass().getName()),
                        false);
                assertThat(
                        
functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER),
                        returnsFunction(
                                FunctionIdentifier.of(IDENTIFIER),
-                               new AggregateFunctionDefinition(NAME, new 
TestAggregateFunction1(), Types.STRING, Types.STRING)));
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // Legacy function handling before FLIP-65
-       // 
--------------------------------------------------------------------------------------------
-
-       @Test
-       public void testRegisterAndDropTempSystemFunction() {
-               
assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(NAME));
-
-               functionCatalog.registerTempSystemScalarFunction(NAME, new 
TestFunction1());
-               
assertTrue(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(NAME));
-
-               functionCatalog.dropTemporarySystemFunction(NAME, false);
-               
assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(NAME));
-
-               functionCatalog.dropTemporarySystemFunction(NAME, true);
-               
assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(NAME));
-       }
-
-       @Test
-       public void testRegisterAndDropTempCatalogFunction() {
-               
assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(NAME));
-
-               functionCatalog.registerTempCatalogScalarFunction(IDENTIFIER, 
new TestFunction1());
-               
assertTrue(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(IDENTIFIER.getObjectName()));
-
-               functionCatalog.dropTempCatalogFunction(IDENTIFIER, false);
-               
assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(IDENTIFIER.getObjectName()));
-
-               functionCatalog.dropTempCatalogFunction(IDENTIFIER, true);
-               
assertFalse(Arrays.asList(functionCatalog.getUserDefinedFunctions()).contains(IDENTIFIER.getObjectName()));
+                               // TODO aggregate functions still use marker 
interface
+                               new AggregateFunctionDefinition(NAME, 
AGGREGATE_FUNCTION, Types.STRING, Types.STRING)));
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -673,7 +629,7 @@ public class FunctionCatalogTest {
 
                @Override
                public Optional<FunctionDefinition> 
getFunctionDefinition(String name) {
-                       return Optional.of(new ScalarFunctionDefinition(NAME, 
new TestFunction3()));
+                       return Optional.of(FUNCTION_3);
                }
        }
 
@@ -698,6 +654,11 @@ public class FunctionCatalogTest {
                public String eval(){
                        return null;
                }
+
+               @Override
+               public boolean equals(Object o) {
+                       return o != null && o.getClass() == this.getClass();
+               }
        }
 
        /**
@@ -707,6 +668,11 @@ public class FunctionCatalogTest {
                public String eval(){
                        return null;
                }
+
+               @Override
+               public boolean equals(Object o) {
+                       return o != null && o.getClass() == this.getClass();
+               }
        }
 
        /**
@@ -716,6 +682,11 @@ public class FunctionCatalogTest {
                public String eval(){
                        return null;
                }
+
+               @Override
+               public boolean equals(Object o) {
+                       return o != null && o.getClass() == this.getClass();
+               }
        }
 
        /**
@@ -728,7 +699,8 @@ public class FunctionCatalogTest {
        /**
         * Testing table function.
         */
-       public static class TestTableFunction1 extends TableFunction<String> {
+       @SuppressWarnings("unused")
+       public static class TestTableFunction extends TableFunction<String> {
                public void eval(String in) {}
 
                @Override
@@ -740,7 +712,8 @@ public class FunctionCatalogTest {
        /**
         * Testing aggregate function.
         */
-       public static class TestAggregateFunction1 extends 
AggregateFunction<String, String> {
+       @SuppressWarnings("unused")
+       public static class TestAggregateFunction extends 
AggregateFunction<String, String> {
 
                @Override
                public String getValue(String accumulator) {
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
index 83497a8..8fc6e40 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
@@ -37,7 +37,7 @@ public class FunctionDefinitionUtilTest {
                                TestScalarFunction.class.getName()
                );
 
-               assertTrue(((ScalarFunctionDefinition) fd).getScalarFunction() 
instanceof TestScalarFunction);
+               assertTrue(fd instanceof TestScalarFunction);
        }
 
        @Test
@@ -47,14 +47,14 @@ public class FunctionDefinitionUtilTest {
                        TestTableFunction.class.getName()
                );
 
-               assertTrue(((TableFunctionDefinition) fd1).getTableFunction() 
instanceof TestTableFunction);
+               assertTrue(fd1 instanceof TestTableFunction);
 
                FunctionDefinition fd2 = 
FunctionDefinitionUtil.createFunctionDefinition(
                                "test",
                                
TestTableFunctionWithoutResultType.class.getName()
                );
 
-               assertTrue(((TableFunctionDefinition) fd2).getTableFunction() 
instanceof TestTableFunctionWithoutResultType);
+               assertTrue(fd2 instanceof TestTableFunctionWithoutResultType);
        }
 
        @Test
@@ -64,6 +64,7 @@ public class FunctionDefinitionUtilTest {
                        TestAggFunction.class.getName()
                );
 
+               // TODO aggregate functions still use marker interface
                assertTrue(((AggregateFunctionDefinition) 
fd1).getAggregateFunction() instanceof TestAggFunction);
 
                FunctionDefinition fd2 = 
FunctionDefinitionUtil.createFunctionDefinition(
@@ -85,6 +86,7 @@ public class FunctionDefinitionUtilTest {
                        TestTableAggFunction.class.getName()
                );
 
+               // TODO table aggregate functions still use marker interface
                assertTrue(((TableAggregateFunctionDefinition) 
fd1).getTableAggregateFunction() instanceof TestTableAggFunction);
 
                FunctionDefinition fd2 = 
FunctionDefinitionUtil.createFunctionDefinition(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
index db2480c..72df5e3 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
@@ -20,7 +20,16 @@ package org.apache.flink.table.functions.python;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * The wrapper of user defined python scalar function.
@@ -91,6 +100,17 @@ public class PythonScalarFunction extends ScalarFunction 
implements PythonFuncti
        }
 
        @Override
+       public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+               final List<DataType> argumentDataTypes = Stream.of(inputTypes)
+                       .map(TypeConversions::fromLegacyInfoToDataType)
+                       .collect(Collectors.toList());
+               return TypeInference.newBuilder()
+                       .typedArguments(argumentDataTypes)
+                       
.outputTypeStrategy(TypeStrategies.explicit(TypeConversions.fromLegacyInfoToDataType(resultType)))
+                       .build();
+       }
+
+       @Override
        public String toString() {
                return name;
        }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
index 2244375..3560170 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
@@ -21,9 +21,18 @@ package org.apache.flink.table.functions.python;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
+import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 /**
  * The wrapper of user defined python table function.
  */
@@ -93,6 +102,17 @@ public class PythonTableFunction extends TableFunction<Row> 
implements PythonFun
        }
 
        @Override
+       public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+               final List<DataType> argumentDataTypes = Stream.of(inputTypes)
+                       .map(TypeConversions::fromLegacyInfoToDataType)
+                       .collect(Collectors.toList());
+               return TypeInference.newBuilder()
+                       .typedArguments(argumentDataTypes)
+                       
.outputTypeStrategy(TypeStrategies.explicit(TypeConversions.fromLegacyInfoToDataType(resultType)))
+                       .build();
+       }
+
+       @Override
        public String toString() {
                return name;
        }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonBase.scala
index eba8eab1..fc96c57 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonBase.scala
@@ -23,13 +23,14 @@ import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.functions.UserDefinedFunction
+import org.apache.flink.table.functions.FunctionDefinition
 import org.apache.flink.table.functions.python.{PythonFunction, 
PythonFunctionInfo}
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
 import org.apache.flink.table.planner.functions.utils.{ScalarSqlFunction, 
TableSqlFunction}
 import org.apache.flink.table.planner.utils.DummyStreamExecutionEnvironment
 
-import scala.collection.mutable
 import scala.collection.JavaConversions._
+import scala.collection.mutable
 
 trait CommonPythonBase {
 
@@ -50,7 +51,8 @@ trait CommonPythonBase {
   private def createPythonFunctionInfo(
       pythonRexCall: RexCall,
       inputNodes: mutable.Map[RexNode, Integer],
-      func: UserDefinedFunction): PythonFunctionInfo = {
+      functionDefinition: FunctionDefinition)
+    : PythonFunctionInfo = {
     val inputs = new mutable.ArrayBuffer[AnyRef]()
     pythonRexCall.getOperands.foreach {
       case pythonRexCall: RexCall =>
@@ -73,7 +75,7 @@ trait CommonPythonBase {
         }
     }
 
-    new PythonFunctionInfo(func.asInstanceOf[PythonFunction], inputs.toArray)
+    new PythonFunctionInfo(functionDefinition.asInstanceOf[PythonFunction], 
inputs.toArray)
   }
 
   protected def createPythonFunctionInfo(
@@ -84,6 +86,8 @@ trait CommonPythonBase {
         createPythonFunctionInfo(pythonRexCall, inputNodes, sfc.scalarFunction)
       case tfc: TableSqlFunction =>
         createPythonFunctionInfo(pythonRexCall, inputNodes, tfc.udtf)
+      case bsf: BridgingSqlFunction =>
+        createPythonFunctionInfo(pythonRexCall, inputNodes, bsf.getDefinition)
     }
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/PythonUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/PythonUtil.scala
index d924e0e..b89410b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/PythonUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/PythonUtil.scala
@@ -19,8 +19,9 @@
 package org.apache.flink.table.planner.plan.utils
 
 import org.apache.calcite.rex.{RexCall, RexNode}
-import org.apache.flink.table.functions.UserDefinedFunction
+import org.apache.flink.table.functions.FunctionDefinition
 import org.apache.flink.table.functions.python.{PythonFunction, 
PythonFunctionKind}
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
 import org.apache.flink.table.planner.functions.utils.{ScalarSqlFunction, 
TableSqlFunction}
 
 import scala.collection.JavaConversions._
@@ -91,13 +92,14 @@ object PythonUtil {
       rexCall.getOperator match {
         case sfc: ScalarSqlFunction => isPythonFunction(sfc.scalarFunction)
         case tfc: TableSqlFunction => isPythonFunction(tfc.udtf)
+        case bsf: BridgingSqlFunction => isPythonFunction(bsf.getDefinition)
         case _ => false
     }
 
-    private def isPythonFunction(userDefinedFunction: UserDefinedFunction): 
Boolean = {
-      userDefinedFunction.isInstanceOf[PythonFunction] &&
+    private def isPythonFunction(functionDefinition: FunctionDefinition): 
Boolean = {
+      functionDefinition.isInstanceOf[PythonFunction] &&
         (pythonFunctionKind.isEmpty ||
-          
userDefinedFunction.asInstanceOf[PythonFunction].getPythonFunctionKind ==
+          
functionDefinition.asInstanceOf[PythonFunction].getPythonFunctionKind ==
             pythonFunctionKind.get)
     }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
index 0c9bdef..1c86fc4 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
@@ -20,8 +20,8 @@ package org.apache.flink.table.planner.runtime.utils;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.FunctionContext;
@@ -140,7 +140,9 @@ public class JavaUserDefinedScalarFunctions {
                        openCalled = true;
                }
 
-               public Timestamp eval(TimestampData timestampData, Integer 
offset) {
+               public @DataTypeHint("TIMESTAMP(3)") Timestamp eval(
+                               @DataTypeHint("TIMESTAMP(3)") TimestampData 
timestampData,
+                               Integer offset) {
                        if (!openCalled) {
                                fail("Open was not called before run.");
                        }
@@ -154,11 +156,6 @@ public class JavaUserDefinedScalarFunctions {
                }
 
                @Override
-               public TypeInformation<?> getResultType(Class<?>[] signature) {
-                       return Types.SQL_TIMESTAMP;
-               }
-
-               @Override
                public void close() {
                        closeCalled = true;
                }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
index 7bd7a31..2d0a063 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
@@ -24,8 +24,11 @@ import org.apache.flink.table.calcite.FlinkTypeFactory;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.ScalarFunctionDefinition;
+import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.TableFunctionDefinition;
+import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils;
 
 import org.apache.calcite.sql.SqlFunction;
@@ -35,6 +38,8 @@ import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.SqlSyntax;
 import org.apache.calcite.sql.validate.SqlNameMatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Optional;
@@ -45,6 +50,8 @@ import java.util.Optional;
 @Internal
 public class FunctionCatalogOperatorTable implements SqlOperatorTable {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(FunctionCatalogOperatorTable.class);
+
        private final FunctionCatalog functionCatalog;
        private final FlinkTypeFactory typeFactory;
 
@@ -100,6 +107,29 @@ public class FunctionCatalogOperatorTable implements 
SqlOperatorTable {
                } else if (functionDefinition instanceof 
BuiltInFunctionDefinition) {
                        return Optional.empty();
                }
+               LOG.warn(
+                       "The new type inference for functions is only supported 
in the Blink planner. " +
+                               "Falling back to legacy type inference for 
function '{}'.",
+                       functionDefinition.getClass().toString());
+               if (functionDefinition instanceof ScalarFunction) {
+                       return convertToSqlFunction(
+                               category,
+                               name,
+                               new ScalarFunctionDefinition(
+                                       name,
+                                       (ScalarFunction) functionDefinition)
+                       );
+               } else if (functionDefinition instanceof TableFunction) {
+                       final TableFunction<?> t = (TableFunction<?>) 
functionDefinition;
+                       return convertToSqlFunction(
+                               category,
+                               name,
+                               new TableFunctionDefinition(
+                                       name,
+                                       t,
+                                       
UserDefinedFunctionHelper.getReturnTypeOfTableFunction(t))
+                       );
+               }
                throw new TableException(
                        "The new type inference for functions is only supported 
in the Blink planner.");
        }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index 7921c74..86fc0eb 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -26,15 +26,18 @@ import org.apache.flink.table.functions._
 import org.apache.flink.table.types.logical.LogicalTypeRoot.SYMBOL
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks._
 import 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
-
 import java.time.{LocalDate, LocalDateTime}
 
+import org.apache.flink.table.util.Logging
+
 import _root_.scala.collection.JavaConverters._
 
 /**
   * Visitor implementation for converting [[Expression]]s to 
[[PlannerExpression]]s.
   */
-class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExpression] {
+class PlannerExpressionConverter private
+  extends ApiExpressionVisitor[PlannerExpression]
+  with Logging {
 
   override def visit(call: CallExpression): PlannerExpression = {
     translateCall(call.getFunctionDefinition, call.getChildren.asScala)
@@ -84,6 +87,7 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
     val args = children.map(_.accept(this))
 
     func match {
+      // explicit legacy
       case sfd: ScalarFunctionDefinition =>
         val call = PlannerScalarFunctionCall(
           sfd.getScalarFunction,
@@ -92,6 +96,7 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
         call.validateInput()
         call
 
+      // explicit legacy
       case tfd: TableFunctionDefinition =>
         PlannerTableFunctionCall(
           tfd.toString,
@@ -99,6 +104,7 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
           args,
           tfd.getResultType)
 
+      // explicit legacy
       case afd: AggregateFunctionDefinition =>
         AggFunctionCall(
           afd.getAggregateFunction,
@@ -106,6 +112,7 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
           afd.getAccumulatorTypeInfo,
           args)
 
+      // explicit legacy
       case tafd: TableAggregateFunctionDefinition =>
         AggFunctionCall(
           tafd.getTableAggregateFunction,
@@ -113,6 +120,51 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
           tafd.getAccumulatorTypeInfo,
           args)
 
+      // best-effort support for new type inference
+      case sf: ScalarFunction =>
+        LOG.warn(
+          "The new type inference for functions is only supported in the Blink 
planner. " +
+           "Falling back to legacy type inference for function '{}'.", 
sf.getClass)
+        val call = PlannerScalarFunctionCall(
+          sf,
+          args)
+        // it configures underlying state
+        call.validateInput()
+        call
+
+      // best-effort support for new type inference
+      case tf: TableFunction[_] =>
+        LOG.warn(
+          "The new type inference for functions is only supported in the Blink 
planner. " +
+           "Falling back to legacy type inference for function '{}'.", 
tf.getClass)
+        PlannerTableFunctionCall(
+          tf.toString,
+          tf,
+          args,
+          UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tf))
+
+      // best-effort support for new type inference
+      case af: AggregateFunction[_, _] =>
+        LOG.warn(
+          "The new type inference for functions is only supported in the Blink 
planner. " +
+           "Falling back to legacy type inference for function '{}'.", 
af.getClass)
+        AggFunctionCall(
+          af,
+          UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(af),
+          UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(af),
+          args)
+
+      // best-effort support for new type inference
+      case taf: TableAggregateFunction[_, _] =>
+        LOG.warn(
+          "The new type inference for functions is only supported in the Blink 
planner. " +
+           "Falling back to legacy type inference for function '{}'.", 
taf.getClass)
+        AggFunctionCall(
+          taf,
+          UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(taf),
+          UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(taf),
+          args)
+
       case _ : UserDefinedFunction =>
         throw new ValidationException(
           "The new type inference for functions is only supported in the Blink 
planner.")
@@ -442,7 +494,7 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
             assert(args.size == 1)
             Ln(args.head)
 
-          case LOG =>
+          case BuiltInFunctionDefinitions.LOG =>
             assert(args.size == 1 || args.size == 2)
             if (args.size == 1) {
               Log(args.head)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
index 2ca2135..f466472 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.stream.sql;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -451,24 +450,6 @@ public class FunctionITCase extends AbstractTestBase {
                tableEnv.sqlUpdate("drop table t2");
        }
 
-       @Test
-       public void testDataTypeBasedTypeInferenceNotSupported() throws 
Exception {
-               thrown.expect(ValidationException.class);
-               thrown.expectMessage("The new type inference for functions is 
only supported in the Blink planner.");
-
-               StreamExecutionEnvironment streamExecEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useOldPlanner().build();
-               StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(
-                               streamExecEnvironment, settings);
-
-               tableEnvironment.createTemporarySystemFunction("func", 
SimpleScalarFunction.class);
-               Table table = tableEnvironment
-                       .sqlQuery("SELECT func(1)");
-               tableEnvironment.toAppendStream(table, Row.class).print();
-
-               streamExecEnvironment.execute();
-       }
-
        /**
         * Simple scalar function.
         */
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/table/FunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/table/FunctionITCase.java
deleted file mode 100644
index 509005d..0000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/table/FunctionITCase.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.stream.table;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.annotation.FunctionHint;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.types.Row;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-
-/**
- * Tests for user defined functions in the Table API.
- */
-public class FunctionITCase extends AbstractTestBase {
-
-       @Rule
-       public ExpectedException thrown = ExpectedException.none();
-
-       @Test
-       public void testDataTypeBasedTypeInferenceNotSupported() throws 
Exception {
-               thrown.expect(ValidationException.class);
-               thrown.expectMessage("The new type inference for functions is 
only supported in the Blink planner.");
-
-               StreamExecutionEnvironment streamExecEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useOldPlanner().build();
-               StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(
-                               streamExecEnvironment, settings);
-
-               Table table = tableEnvironment
-                       .sqlQuery("SELECT * FROM (VALUES (1)) AS TableName(f0)")
-                       .select(call(new SimpleScalarFunction(), $("f0")));
-               tableEnvironment.toAppendStream(table, Row.class).print();
-
-               streamExecEnvironment.execute();
-       }
-
-       @Test
-       public void testDataTypeBasedTypeInferenceNotSupportedInLateralJoin() 
throws Exception {
-               thrown.expect(ValidationException.class);
-               thrown.expectMessage("The new type inference for functions is 
only supported in the Blink planner.");
-
-               StreamExecutionEnvironment streamExecEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useOldPlanner().build();
-               StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(
-                               streamExecEnvironment, settings);
-
-               Table table = tableEnvironment
-                       .sqlQuery("SELECT * FROM (VALUES ('A,B,C')) AS 
TableName(f0)")
-                       .joinLateral(call(new SimpleTableFunction(), 
$("f0")).as("a", "b"))
-                       .select($("a"), $("b"));
-               tableEnvironment.toAppendStream(table, Row.class).print();
-
-               streamExecEnvironment.execute();
-       }
-
-       /**
-        * Simple scalar function.
-        */
-       public static class SimpleScalarFunction extends ScalarFunction {
-               public long eval(Integer i) {
-                       return i;
-               }
-       }
-
-       /**
-        * Simple table function.
-        */
-       @FunctionHint(output = @DataTypeHint("ROW<s STRING, sa ARRAY<STRING>>"))
-       public static class SimpleTableFunction extends TableFunction<Row> {
-               public void eval(String s) {
-                       // no-op
-               }
-       }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index 3bcaeac..10bb2f4 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -393,15 +393,6 @@ class CalcITCase(
     val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z").mkString("\n")
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
-
-  // new type inference for functions is only supported in the Blink planner
-  @Test(expected = classOf[ValidationException])
-  def testUnsupportedNewFunctionTypeInference(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = BatchTableEnvironment.create(env)
-    tEnv.createTemporarySystemFunction("testFunc", new Func13(">>"))
-    tEnv.sqlQuery("SELECT testFunc('fail')").toDataSet[Row]
-  }
 }
 
 object MyHashCode extends ScalarFunction {

Reply via email to