This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ec40d8528621d7533565261b8e46de91a14c40ee
Author: Timo Walther <[email protected]>
AuthorDate: Mon Dec 9 17:34:12 2019 +0100

    [FLINK-12283][table] Relax UDF constraints by using the ClosureCleaner
    
    This relaxes the UDF constraints as much as possible by using the
    ClosureCleaner. For Java users, this is very convenient. For Scala users, it
    improves the current status but we still need to fix FLINK-15162.
    
    It ensures that the validation and cleaning happens at exactly 2 locations.
    Either during registration in the function catalog or during resolution of
    inline, unregistered functions.
    
    This closes #10519.
---
 .../client/gateway/local/ExecutionContext.java     | 27 ++++---
 .../java/internal/StreamTableEnvironmentImpl.java  |  2 +-
 .../internal/StreamTableEnvironmentImplTest.java   |  5 +-
 .../table/api/internal/TableEnvironmentImpl.java   |  6 +-
 .../flink/table/catalog/FunctionCatalog.java       | 36 ++++-----
 .../expressions/resolver/ExpressionResolver.java   | 18 ++++-
 .../resolver/rules/ResolveCallByArgumentsRule.java | 50 +++++++++++-
 .../expressions/resolver/rules/ResolverRule.java   |  6 ++
 .../table/functions/UserDefinedFunctionHelper.java | 40 ++++++++--
 .../operations/utils/OperationTreeBuilder.java     | 36 +++++++--
 .../flink/table/catalog/FunctionCatalogTest.java   |  2 +
 .../flink/table/utils/TableEnvironmentMock.java    | 12 ++-
 .../internal/StreamTableEnvironmentImpl.scala      |  2 +-
 .../internal/StreamTableEnvironmentImplTest.scala  |  5 +-
 .../functions/AggregateFunctionDefinition.java     |  4 +
 .../table/functions/ScalarFunctionDefinition.java  |  4 +
 .../TableAggregateFunctionDefinition.java          |  4 +
 .../table/functions/TableFunctionDefinition.java   |  4 +
 .../flink/table/functions/UserDefinedFunction.java |  2 +-
 .../expressions/CallExpressionResolver.java        |  1 +
 .../planner/codegen/CodeGeneratorContext.scala     |  2 +-
 .../flink/table/planner/expressions/call.scala     |  4 -
 .../operations/SqlToOperationConverterTest.java    |  5 +-
 .../codegen/WatermarkGeneratorCodeGenTest.scala    |  5 +-
 .../metadata/AggCallSelectivityEstimatorTest.scala |  3 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |  2 +-
 .../plan/metadata/SelectivityEstimatorTest.scala   |  2 +-
 .../planner/plan/utils/RexNodeExtractorTest.scala  |  7 +-
 .../planner/runtime/stream/table/CalcITCase.scala  | 77 ++++++++++++++++++-
 .../flink/table/planner/utils/TableTestBase.scala  |  2 +-
 .../flink/table/api/internal/TableEnvImpl.scala    |  3 +-
 .../apache/flink/table/codegen/CodeGenerator.scala |  2 +-
 .../org/apache/flink/table/expressions/call.scala  |  4 -
 .../PushFilterIntoTableSourceScanRule.scala        |  4 +-
 .../table/sqlexec/SqlToOperationConverterTest.java |  5 +-
 .../api/stream/StreamTableEnvironmentTest.scala    |  2 +-
 .../flink/table/api/stream/sql/AggregateTest.scala |  5 +-
 .../flink/table/plan/RexProgramExtractorTest.scala |  2 +
 .../table/runtime/stream/table/CalcITCase.scala    | 88 ++++++++++++++++++++++
 .../apache/flink/table/utils/TableTestBase.scala   |  2 +-
 40 files changed, 405 insertions(+), 87 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 26f5322..8a1f082 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -393,13 +393,12 @@ public class ExecutionContext<ClusterID> {
        private static TableEnvironment createStreamTableEnvironment(
                        StreamExecutionEnvironment env,
                        EnvironmentSettings settings,
+                       TableConfig config,
                        Executor executor,
                        CatalogManager catalogManager,
                        ModuleManager moduleManager,
                        FunctionCatalog functionCatalog) {
 
-               final TableConfig config = TableConfig.getDefault();
-
                final Map<String, String> plannerProperties = 
settings.toPlannerProperties();
                final Planner planner = 
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
                        .create(plannerProperties, executor, config, 
functionCatalog, catalogManager);
@@ -441,6 +440,10 @@ public class ExecutionContext<ClusterID> {
                        
//--------------------------------------------------------------------------------------------------------------
                        // Step.1 Create environments
                        
//--------------------------------------------------------------------------------------------------------------
+                       // Step 1.0 Initialize the table configuration.
+                       final TableConfig config = new TableConfig();
+                       environment.getConfiguration().asMap().forEach((k, v) ->
+                                       config.getConfiguration().setString(k, 
v));
                        // Step 1.1 Initialize the CatalogManager if required.
                        final CatalogManager catalogManager = new 
CatalogManager(
                                        settings.getBuiltInCatalogName(),
@@ -450,12 +453,12 @@ public class ExecutionContext<ClusterID> {
                        // Step 1.2 Initialize the ModuleManager if required.
                        final ModuleManager moduleManager = new ModuleManager();
                        // Step 1.3 Initialize the FunctionCatalog if required.
-                       final FunctionCatalog functionCatalog = new 
FunctionCatalog(catalogManager, moduleManager);
+                       final FunctionCatalog functionCatalog = new 
FunctionCatalog(config, catalogManager, moduleManager);
                        // Step 1.4 Set up session state.
-                       this.sessionState = SessionState.of(catalogManager, 
moduleManager, functionCatalog);
+                       this.sessionState = SessionState.of(config, 
catalogManager, moduleManager, functionCatalog);
 
                        // Must initialize the table environment before 
actually the
-                       createTableEnvironment(settings, catalogManager, 
moduleManager, functionCatalog);
+                       createTableEnvironment(settings, config, 
catalogManager, moduleManager, functionCatalog);
 
                        
//--------------------------------------------------------------------------------------------------------------
                        // Step.2 Create modules and load them into the 
TableEnvironment.
@@ -487,6 +490,7 @@ public class ExecutionContext<ClusterID> {
                        this.sessionState = sessionState;
                        createTableEnvironment(
                                        settings,
+                                       sessionState.config,
                                        sessionState.catalogManager,
                                        sessionState.moduleManager,
                                        sessionState.functionCatalog);
@@ -495,6 +499,7 @@ public class ExecutionContext<ClusterID> {
 
        private void createTableEnvironment(
                        EnvironmentSettings settings,
+                       TableConfig config,
                        CatalogManager catalogManager,
                        ModuleManager moduleManager,
                        FunctionCatalog functionCatalog) {
@@ -507,6 +512,7 @@ public class ExecutionContext<ClusterID> {
                        tableEnv = createStreamTableEnvironment(
                                        streamExecEnv,
                                        settings,
+                                       config,
                                        executor,
                                        catalogManager,
                                        moduleManager,
@@ -517,15 +523,12 @@ public class ExecutionContext<ClusterID> {
                        executor = null;
                        tableEnv = new BatchTableEnvironmentImpl(
                                        execEnv,
-                                       TableConfig.getDefault(),
+                                       config,
                                        catalogManager,
                                        moduleManager);
                } else {
                        throw new SqlExecutionException("Unsupported execution 
type specified.");
                }
-               // set table configuration
-               environment.getConfiguration().asMap().forEach((k, v) ->
-                               
tableEnv.getConfig().getConfiguration().setString(k, v));
        }
 
        private void initializeCatalogs() {
@@ -760,24 +763,28 @@ public class ExecutionContext<ClusterID> {
 
        /** Represents the state that should be reused in one session. **/
        public static class SessionState {
+               public final TableConfig config;
                public final CatalogManager catalogManager;
                public final ModuleManager moduleManager;
                public final FunctionCatalog functionCatalog;
 
                private SessionState(
+                               TableConfig config,
                                CatalogManager catalogManager,
                                ModuleManager moduleManager,
                                FunctionCatalog functionCatalog) {
+                       this.config = config;
                        this.catalogManager = catalogManager;
                        this.moduleManager = moduleManager;
                        this.functionCatalog = functionCatalog;
                }
 
                public static SessionState of(
+                               TableConfig config,
                                CatalogManager catalogManager,
                                ModuleManager moduleManager,
                                FunctionCatalog functionCatalog) {
-                       return new SessionState(catalogManager, moduleManager, 
functionCatalog);
+                       return new SessionState(config, catalogManager, 
moduleManager, functionCatalog);
                }
        }
 }
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 760a934..d269fc4 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -110,7 +110,7 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl imple
 
                ModuleManager moduleManager = new ModuleManager();
 
-               FunctionCatalog functionCatalog = new 
FunctionCatalog(catalogManager, moduleManager);
+               FunctionCatalog functionCatalog = new 
FunctionCatalog(tableConfig, catalogManager, moduleManager);
 
                Map<String, String> executorProperties = 
settings.toExecutorProperties();
                Executor executor = lookupExecutor(executorProperties, 
executionEnvironment);
diff --git 
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImplTest.java
 
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImplTest.java
index 1a0cf5a..3ddf5c8 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImplTest.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImplTest.java
@@ -90,13 +90,14 @@ public class StreamTableEnvironmentImplTest {
        private StreamTableEnvironmentImpl getStreamTableEnvironment(
                        StreamExecutionEnvironment env,
                        DataStreamSource<Integer> elements) {
+               TableConfig config = new TableConfig();
                CatalogManager catalogManager = new CatalogManager("cat", new 
GenericInMemoryCatalog("cat", "db"));
                ModuleManager moduleManager = new ModuleManager();
                return new StreamTableEnvironmentImpl(
                        catalogManager,
                        moduleManager,
-                       new FunctionCatalog(catalogManager, moduleManager),
-                       new TableConfig(),
+                       new FunctionCatalog(config, catalogManager, 
moduleManager),
+                       config,
                        env,
                        new TestPlanner(elements.getTransformation()),
                        new ExecutorMock(),
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 712c6b2..5429509 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -169,6 +169,7 @@ public class TableEnvironmentImpl implements 
TableEnvironment {
                this.planner = planner;
                this.parser = planner.getParser();
                this.operationTreeBuilder = OperationTreeBuilder.create(
+                       tableConfig,
                        functionCatalog,
                        path -> {
                                try {
@@ -189,18 +190,19 @@ public class TableEnvironmentImpl implements 
TableEnvironment {
 
        public static TableEnvironmentImpl create(EnvironmentSettings settings) 
{
 
+               TableConfig tableConfig = new TableConfig();
+
                CatalogManager catalogManager = new CatalogManager(
                        settings.getBuiltInCatalogName(),
                        new 
GenericInMemoryCatalog(settings.getBuiltInCatalogName(), 
settings.getBuiltInDatabaseName()));
 
                ModuleManager moduleManager = new ModuleManager();
-               FunctionCatalog functionCatalog = new 
FunctionCatalog(catalogManager, moduleManager);
+               FunctionCatalog functionCatalog = new 
FunctionCatalog(tableConfig, catalogManager, moduleManager);
 
                Map<String, String> executorProperties = 
settings.toExecutorProperties();
                Executor executor = 
ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
                        .create(executorProperties);
 
-               TableConfig tableConfig = new TableConfig();
                Map<String, String> plannerProperties = 
settings.toPlannerProperties();
                Planner planner = 
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
                        .create(plannerProperties, executor, tableConfig, 
functionCatalog, catalogManager);
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index be94b0d..1838c13 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -55,6 +56,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 @Internal
 public class FunctionCatalog implements FunctionLookup {
+       private final TableConfig config;
        private final CatalogManager catalogManager;
        private final ModuleManager moduleManager;
 
@@ -66,7 +68,11 @@ public class FunctionCatalog implements FunctionLookup {
         */
        private PlannerTypeInferenceUtil plannerTypeInferenceUtil;
 
-       public FunctionCatalog(CatalogManager catalogManager, ModuleManager 
moduleManager) {
+       public FunctionCatalog(
+                       TableConfig config,
+                       CatalogManager catalogManager,
+                       ModuleManager moduleManager) {
+               this.config = checkNotNull(config);
                this.catalogManager = checkNotNull(catalogManager);
                this.moduleManager = checkNotNull(moduleManager);
        }
@@ -76,7 +82,8 @@ public class FunctionCatalog implements FunctionLookup {
        }
 
        public void registerTempSystemScalarFunction(String name, 
ScalarFunction function) {
-               
UserDefinedFunctionHelper.validateInstantiation(function.getClass());
+               UserDefinedFunctionHelper.prepareFunction(config, function);
+
                registerTempSystemFunction(
                        name,
                        new ScalarFunctionDefinition(name, function)
@@ -87,10 +94,7 @@ public class FunctionCatalog implements FunctionLookup {
                        String name,
                        TableFunction<T> function,
                        TypeInformation<T> resultType) {
-               // check if class not Scala object
-               
UserDefinedFunctionHelper.validateNotSingleton(function.getClass());
-               // check if class could be instantiated
-               
UserDefinedFunctionHelper.validateInstantiation(function.getClass());
+               UserDefinedFunctionHelper.prepareFunction(config, function);
 
                registerTempSystemFunction(
                        name,
@@ -106,10 +110,7 @@ public class FunctionCatalog implements FunctionLookup {
                        UserDefinedAggregateFunction<T, ACC> function,
                        TypeInformation<T> resultType,
                        TypeInformation<ACC> accType) {
-               // check if class not Scala object
-               
UserDefinedFunctionHelper.validateNotSingleton(function.getClass());
-               // check if class could be instantiated
-               
UserDefinedFunctionHelper.validateInstantiation(function.getClass());
+               UserDefinedFunctionHelper.prepareFunction(config, function);
 
                final FunctionDefinition definition;
                if (function instanceof AggregateFunction) {
@@ -135,7 +136,8 @@ public class FunctionCatalog implements FunctionLookup {
        }
 
        public void registerTempCatalogScalarFunction(ObjectIdentifier oi, 
ScalarFunction function) {
-               
UserDefinedFunctionHelper.validateInstantiation(function.getClass());
+               UserDefinedFunctionHelper.prepareFunction(config, function);
+
                registerTempCatalogFunction(
                        oi,
                        new ScalarFunctionDefinition(oi.getObjectName(), 
function)
@@ -146,10 +148,7 @@ public class FunctionCatalog implements FunctionLookup {
                        ObjectIdentifier oi,
                        TableFunction<T> function,
                        TypeInformation<T> resultType) {
-               // check if class not Scala object
-               
UserDefinedFunctionHelper.validateNotSingleton(function.getClass());
-               // check if class could be instantiated
-               
UserDefinedFunctionHelper.validateInstantiation(function.getClass());
+               UserDefinedFunctionHelper.prepareFunction(config, function);
 
                registerTempCatalogFunction(
                        oi,
@@ -165,10 +164,7 @@ public class FunctionCatalog implements FunctionLookup {
                        UserDefinedAggregateFunction<T, ACC> function,
                        TypeInformation<T> resultType,
                        TypeInformation<ACC> accType) {
-               // check if class not Scala object
-               
UserDefinedFunctionHelper.validateNotSingleton(function.getClass());
-               // check if class could be instantiated
-               
UserDefinedFunctionHelper.validateInstantiation(function.getClass());
+               UserDefinedFunctionHelper.prepareFunction(config, function);
 
                final FunctionDefinition definition;
                if (function instanceof AggregateFunction) {
@@ -378,7 +374,7 @@ public class FunctionCatalog implements FunctionLookup {
                )).orElseGet(() -> resolvePreciseFunctionReference(oi));
        }
 
-               @Override
+       @Override
        public PlannerTypeInferenceUtil getPlannerTypeInferenceUtil() {
                Preconditions.checkNotNull(
                        plannerTypeInferenceUtil,
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 1763404..78a18a7 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.expressions.resolver;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.GroupWindow;
 import org.apache.flink.table.api.OverWindow;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.CallExpression;
@@ -95,6 +96,8 @@ public class ExpressionResolver {
 
        private static final VerifyResolutionVisitor VERIFY_RESOLUTION_VISITOR 
= new VerifyResolutionVisitor();
 
+       private final TableConfig config;
+
        private final FieldReferenceLookup fieldLookup;
 
        private final TableReferenceLookup tableLookup;
@@ -108,11 +111,13 @@ public class ExpressionResolver {
        private final Map<Expression, LocalOverWindow> localOverWindows;
 
        private ExpressionResolver(
+                       TableConfig config,
                        TableReferenceLookup tableLookup,
                        FunctionLookup functionLookup,
                        FieldReferenceLookup fieldLookup,
                        List<OverWindow> localOverWindows,
                        List<LocalReferenceExpression> localReferences) {
+               this.config = Preconditions.checkNotNull(config);
                this.tableLookup = Preconditions.checkNotNull(tableLookup);
                this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
                this.functionLookup = 
Preconditions.checkNotNull(functionLookup);
@@ -128,16 +133,18 @@ public class ExpressionResolver {
         * Creates a builder for {@link ExpressionResolver}. One can add 
additional properties to the resolver
         * like e.g. {@link GroupWindow} or {@link OverWindow}. You can also 
add additional {@link ResolverRule}.
         *
+        * @param config general configuration
         * @param tableCatalog a way to lookup a table reference by name
         * @param functionLookup a way to lookup call by name
         * @param inputs inputs to use for field resolution
         * @return builder for resolver
         */
        public static ExpressionResolverBuilder resolverFor(
+                       TableConfig config,
                        TableReferenceLookup tableCatalog,
                        FunctionLookup functionLookup,
                        QueryOperation... inputs) {
-               return new ExpressionResolverBuilder(inputs, tableCatalog, 
functionLookup);
+               return new ExpressionResolverBuilder(inputs, config, 
tableCatalog, functionLookup);
        }
 
        /**
@@ -242,6 +249,11 @@ public class ExpressionResolver {
        private class ExpressionResolverContext implements 
ResolverRule.ResolutionContext {
 
                @Override
+               public TableConfig configuration() {
+                       return config;
+               }
+
+               @Override
                public FieldReferenceLookup referenceLookup() {
                        return fieldLookup;
                }
@@ -341,6 +353,7 @@ public class ExpressionResolver {
         */
        public static class ExpressionResolverBuilder {
 
+               private final TableConfig config;
                private final List<QueryOperation> queryOperations;
                private final TableReferenceLookup tableCatalog;
                private final FunctionLookup functionLookup;
@@ -349,8 +362,10 @@ public class ExpressionResolver {
 
                private ExpressionResolverBuilder(
                                QueryOperation[] queryOperations,
+                               TableConfig config,
                                TableReferenceLookup tableCatalog,
                                FunctionLookup functionLookup) {
+                       this.config = config;
                        this.queryOperations = Arrays.asList(queryOperations);
                        this.tableCatalog = tableCatalog;
                        this.functionLookup = functionLookup;
@@ -368,6 +383,7 @@ public class ExpressionResolver {
 
                public ExpressionResolver build() {
                        return new ExpressionResolver(
+                               config,
                                tableCatalog,
                                functionLookup,
                                new FieldReferenceLookup(queryOperations),
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index 122b207..a31efa4 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -29,10 +29,16 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.AggregateFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.functions.ScalarFunctionDefinition;
+import org.apache.flink.table.functions.TableAggregateFunctionDefinition;
+import org.apache.flink.table.functions.TableFunctionDefinition;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.TypeInference;
@@ -58,13 +64,14 @@ import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoT
 
 /**
  * This rule checks if a {@link UnresolvedCallExpression} can work with the 
given arguments and infers
- * the output data type. All function calls are resolved {@link 
CallExpression} after applying this
- * rule.
+ * the output data type. All function calls are resolved {@link 
CallExpression} after applying this rule.
  *
  * <p>This rule also resolves {@code flatten()} calls on composite types.
  *
  * <p>If the call expects different types of arguments, but the given 
arguments have types that can
  * be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted.
+ *
+ * <p>It validates and prepares inline, unregistered {@link 
UserDefinedFunction}s.
  */
 @Internal
 final class ResolveCallByArgumentsRule implements ResolverRule {
@@ -89,7 +96,7 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
 
                @Override
                public List<ResolvedExpression> visit(UnresolvedCallExpression 
unresolvedCall) {
-                       final FunctionDefinition definition = 
unresolvedCall.getFunctionDefinition();
+                       final FunctionDefinition definition = 
prepareUserDefinedFunction(unresolvedCall.getFunctionDefinition());
 
                        final String name = 
unresolvedCall.getFunctionIdentifier()
                                .map(FunctionIdentifier::toString)
@@ -220,6 +227,43 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
                                })
                                .collect(Collectors.toList());
                }
+
+               /**
+                * Validates and cleans an inline, unregistered {@link 
UserDefinedFunction}.
+                */
+               private FunctionDefinition 
prepareUserDefinedFunction(FunctionDefinition definition) {
+                       if (definition instanceof ScalarFunctionDefinition) {
+                               final ScalarFunctionDefinition sf = 
(ScalarFunctionDefinition) definition;
+                               
UserDefinedFunctionHelper.prepareFunction(resolutionContext.configuration(), 
sf.getScalarFunction());
+                               return new ScalarFunctionDefinition(
+                                       sf.getName(),
+                                       sf.getScalarFunction());
+                       } else if (definition instanceof 
TableFunctionDefinition) {
+                               final TableFunctionDefinition tf = 
(TableFunctionDefinition) definition;
+                               
UserDefinedFunctionHelper.prepareFunction(resolutionContext.configuration(), 
tf.getTableFunction());
+                               return new TableFunctionDefinition(
+                                       tf.getName(),
+                                       tf.getTableFunction(),
+                                       tf.getResultType());
+                       } else if (definition instanceof 
AggregateFunctionDefinition) {
+                               final AggregateFunctionDefinition af = 
(AggregateFunctionDefinition) definition;
+                               
UserDefinedFunctionHelper.prepareFunction(resolutionContext.configuration(), 
af.getAggregateFunction());
+                               return new AggregateFunctionDefinition(
+                                       af.getName(),
+                                       af.getAggregateFunction(),
+                                       af.getResultTypeInfo(),
+                                       af.getAccumulatorTypeInfo());
+                       } else if (definition instanceof 
TableAggregateFunctionDefinition) {
+                               final TableAggregateFunctionDefinition taf = 
(TableAggregateFunctionDefinition) definition;
+                               
UserDefinedFunctionHelper.prepareFunction(resolutionContext.configuration(), 
taf.getTableAggregateFunction());
+                               return new TableAggregateFunctionDefinition(
+                                       taf.getName(),
+                                       taf.getTableAggregateFunction(),
+                                       taf.getResultTypeInfo(),
+                                       taf.getAccumulatorTypeInfo());
+                       }
+                       return definition;
+               }
        }
 
        // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
index 0a7fed0..d8525c8 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.LocalReferenceExpression;
@@ -48,6 +49,11 @@ public interface ResolverRule {
        interface ResolutionContext {
 
                /**
+                * Access to configuration.
+                */
+               TableConfig configuration();
+
+               /**
                 * Access to available {@link 
org.apache.flink.table.expressions.FieldReferenceExpression} in inputs.
                 */
                FieldReferenceLookup referenceLookup();
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java
index 6069cb9..35398ec 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.functions;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig.ClosureCleanerLevel;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.table.api.TableConfig;
@@ -146,9 +147,20 @@ public class UserDefinedFunctionHelper {
        }
 
        /**
+        * Prepares a {@link UserDefinedFunction} for usage in the API.
+        */
+       public static void prepareFunction(TableConfig config, 
UserDefinedFunction function) {
+               if (function instanceof TableFunction) {
+                       
UserDefinedFunctionHelper.validateNotSingleton(function.getClass());
+               }
+               
UserDefinedFunctionHelper.validateInstantiation(function.getClass());
+               UserDefinedFunctionHelper.cleanFunction(config, function);
+       }
+
+       /**
         * Checks if a user-defined function can be easily instantiated.
         */
-       public static void validateInstantiation(Class<?> clazz) {
+       private static void validateInstantiation(Class<?> clazz) {
                if (!InstantiationUtil.isPublic(clazz)) {
                        throw new ValidationException(String.format("Function 
class %s is not public.", clazz.getCanonicalName()));
                } else if (!InstantiationUtil.isProperClass(clazz)) {
@@ -159,18 +171,36 @@ public class UserDefinedFunctionHelper {
        }
 
        /**
-        * Check whether this is a Scala object. It is forbidden to use {@link 
TableFunction} implemented
-        * by a Scala object, since concurrent risks.
+        * Check whether this is a Scala object. Using Scala objects can lead 
to concurrency issues,
+        * e.g., due to a shared collector.
         */
-       public static void validateNotSingleton(Class<?> clazz) {
+       private static void validateNotSingleton(Class<?> clazz) {
                // TODO it is not a good way to check singleton. Maybe improve 
it further.
                if (Arrays.stream(clazz.getFields()).anyMatch(f -> 
f.getName().equals("MODULE$"))) {
                        throw new ValidationException(String.format(
-                               "TableFunction implemented by class %s is a 
Scala object. This is forbidden because of concurrency" +
+                               "Function implemented by class %s is a Scala 
object. This is forbidden because of concurrency" +
                                        " problems when using them.", 
clazz.getCanonicalName()));
                }
        }
 
+       /**
+        * Modifies a function instance by removing any reference to outer 
classes. This enables
+        * non-static inner function classes.
+        */
+       private static void cleanFunction(TableConfig config, 
UserDefinedFunction function) {
+               final ClosureCleanerLevel level = 
config.getConfiguration().get(PipelineOptions.CLOSURE_CLEANER_LEVEL);
+               try {
+                       ClosureCleaner.clean(function, level, true);
+               } catch (Throwable t) {
+                       throw new ValidationException(
+                               String.format(
+                                       "Function class '%s' is not 
serializable. Make sure that the class is self-contained " +
+                                               "(i.e. no references to outer 
classes) and all inner fields are serializable as well.",
+                                       function.getClass()),
+                               t);
+               }
+       }
+
        private UserDefinedFunctionHelper() {
        }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
index a27aeba..05fa8b3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.api.GroupWindow;
 import org.apache.flink.table.api.OverWindow;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.FunctionLookup;
@@ -85,6 +86,7 @@ import static 
org.apache.flink.table.operations.SetQueryOperation.SetQueryOperat
 @Internal
 public final class OperationTreeBuilder {
 
+       private final TableConfig config;
        private final FunctionLookup functionCatalog;
        private final TableReferenceLookup tableReferenceLookup;
        private final LookupCallResolver lookupResolver;
@@ -100,6 +102,7 @@ public final class OperationTreeBuilder {
        private final JoinOperationFactory joinOperationFactory;
 
        private OperationTreeBuilder(
+                       TableConfig config,
                        FunctionLookup functionLookup,
                        TableReferenceLookup tableReferenceLookup,
                        ProjectionOperationFactory projectionOperationFactory,
@@ -108,6 +111,7 @@ public final class OperationTreeBuilder {
                        SetOperationFactory setOperationFactory,
                        AggregateOperationFactory aggregateOperationFactory,
                        JoinOperationFactory joinOperationFactory) {
+               this.config = config;
                this.functionCatalog = functionLookup;
                this.tableReferenceLookup = tableReferenceLookup;
                this.projectionOperationFactory = projectionOperationFactory;
@@ -120,10 +124,12 @@ public final class OperationTreeBuilder {
        }
 
        public static OperationTreeBuilder create(
+                       TableConfig config,
                        FunctionLookup functionCatalog,
                        TableReferenceLookup tableReferenceLookup,
                        boolean isStreamingMode) {
                return new OperationTreeBuilder(
+                       config,
                        functionCatalog,
                        tableReferenceLookup,
                        new ProjectionOperationFactory(),
@@ -168,7 +174,11 @@ public final class OperationTreeBuilder {
                        boolean explicitAlias,
                        List<OverWindow> overWindows) {
 
-               ExpressionResolver resolver = 
ExpressionResolver.resolverFor(tableReferenceLookup, functionCatalog, child)
+               ExpressionResolver resolver = ExpressionResolver.resolverFor(
+                               config,
+                               tableReferenceLookup,
+                               functionCatalog,
+                               child)
                        .withOverWindows(overWindows)
                        .build();
                List<ResolvedExpression> projections = 
resolver.resolve(projectList);
@@ -233,6 +243,7 @@ public final class OperationTreeBuilder {
                ResolvedGroupWindow resolvedWindow = 
aggregateOperationFactory.createResolvedWindow(window, resolver);
 
                ExpressionResolver resolverWithWindowReferences = 
ExpressionResolver.resolverFor(
+                               config,
                                tableReferenceLookup,
                                functionCatalog,
                                child)
@@ -282,9 +293,10 @@ public final class OperationTreeBuilder {
                // Step3: resolve expressions, including grouping, aggregates 
and window properties.
                ResolvedGroupWindow resolvedWindow = 
aggregateOperationFactory.createResolvedWindow(window, resolver);
                ExpressionResolver resolverWithWindowReferences = 
ExpressionResolver.resolverFor(
-                       tableReferenceLookup,
-                       functionCatalog,
-                       child)
+                               config,
+                               tableReferenceLookup,
+                               functionCatalog,
+                               child)
                        .withLocalReferences(
                                new LocalReferenceExpression(
                                        resolvedWindow.getAlias(),
@@ -327,7 +339,12 @@ public final class OperationTreeBuilder {
                        JoinType joinType,
                        Optional<Expression> condition,
                        boolean correlated) {
-               ExpressionResolver resolver = 
ExpressionResolver.resolverFor(tableReferenceLookup, functionCatalog, left, 
right)
+               ExpressionResolver resolver = ExpressionResolver.resolverFor(
+                               config,
+                               tableReferenceLookup,
+                               functionCatalog,
+                               left,
+                               right)
                        .build();
                Optional<ResolvedExpression> resolvedCondition = 
condition.map(expr -> resolveSingleExpression(expr, resolver));
 
@@ -356,6 +373,7 @@ public final class OperationTreeBuilder {
 
        public Expression resolveExpression(Expression expression, 
QueryOperation... tableOperation) {
                ExpressionResolver resolver = ExpressionResolver.resolverFor(
+                       config,
                        tableReferenceLookup,
                        functionCatalog,
                        tableOperation).build();
@@ -660,6 +678,7 @@ public final class OperationTreeBuilder {
                ResolvedGroupWindow resolvedWindow = 
aggregateOperationFactory.createResolvedWindow(window, resolver);
 
                ExpressionResolver resolverWithWindowReferences = 
ExpressionResolver.resolverFor(
+                       config,
                        tableReferenceLookup,
                        functionCatalog,
                        child)
@@ -758,7 +777,12 @@ public final class OperationTreeBuilder {
        }
 
        private ExpressionResolver getResolver(QueryOperation child) {
-               return ExpressionResolver.resolverFor(tableReferenceLookup, 
functionCatalog, child).build();
+               return ExpressionResolver.resolverFor(
+                               config,
+                               tableReferenceLookup,
+                               functionCatalog,
+                               child)
+                       .build();
        }
 
        private static class NoWindowPropertyChecker extends 
ApiExpressionDefaultVisitor<Void> {
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
index 05be6d3..bdf64fb 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.catalog;
 
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
@@ -61,6 +62,7 @@ public class FunctionCatalogTest {
                catalog = new GenericInMemoryCatalog(testCatalogName);
                moduleManager = new ModuleManager();
                functionCatalog = new FunctionCatalog(
+                       TableConfig.getDefault(),
                        new CatalogManager(testCatalogName, catalog), 
moduleManager);
        }
 
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
index fa2c58c..57af810 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
@@ -65,14 +65,15 @@ public class TableEnvironmentMock extends 
TableEnvironmentImpl {
        }
 
        private static TableEnvironmentMock getInstance(boolean 
isStreamingMode) {
+               final TableConfig config = createTableConfig();
                final CatalogManager catalogManager = createCatalogManager();
                final ModuleManager moduleManager = new ModuleManager();
                return new TableEnvironmentMock(
                        catalogManager,
                        moduleManager,
-                       createTableConfig(),
+                       config,
                        createExecutor(),
-                       createFunctionCatalog(catalogManager, moduleManager),
+                       createFunctionCatalog(config, catalogManager, 
moduleManager),
                        createPlanner(),
                        isStreamingMode);
        }
@@ -93,8 +94,11 @@ public class TableEnvironmentMock extends 
TableEnvironmentImpl {
                return new ExecutorMock();
        }
 
-       private static FunctionCatalog createFunctionCatalog(CatalogManager 
catalogManager, ModuleManager moduleManager) {
-               return new FunctionCatalog(catalogManager, moduleManager);
+       private static FunctionCatalog createFunctionCatalog(
+                       TableConfig config,
+                       CatalogManager catalogManager,
+                       ModuleManager moduleManager) {
+               return new FunctionCatalog(config, catalogManager, 
moduleManager);
        }
 
        private static PlannerMock createPlanner() {
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index fcddda9..5913d8a 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -315,7 +315,7 @@ object StreamTableEnvironmentImpl {
       new GenericInMemoryCatalog(settings.getBuiltInCatalogName, 
settings.getBuiltInDatabaseName))
 
     val moduleManager = new ModuleManager
-    val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
+    val functionCatalog = new FunctionCatalog(tableConfig, catalogManager, 
moduleManager)
 
     val executorProperties = settings.toExecutorProperties
     val executor = lookupExecutor(executorProperties, executionEnvironment)
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala
 
b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala
index e238ddc..48bebee 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala
@@ -81,6 +81,7 @@ class StreamTableEnvironmentImplTest {
   private def getStreamTableEnvironment(
       env: StreamExecutionEnvironment,
       elements: DataStream[Int]) = {
+    val config = new TableConfig
     val catalogManager = new CatalogManager(
       "cat",
       new GenericInMemoryCatalog("cat", "db"))
@@ -88,8 +89,8 @@ class StreamTableEnvironmentImplTest {
     new StreamTableEnvironmentImpl(
       catalogManager,
       moduleManager,
-      new FunctionCatalog(catalogManager, moduleManager),
-      new TableConfig,
+      new FunctionCatalog(config, catalogManager, moduleManager),
+      config,
       env,
       new TestPlanner(elements.javaStream.getTransformation),
       new ExecutorMock,
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
index b06c55a..df01db0 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
@@ -49,6 +49,10 @@ public final class AggregateFunctionDefinition implements 
FunctionDefinition {
                this.accumulatorTypeInfo = 
Preconditions.checkNotNull(accTypeInfo);
        }
 
+       public String getName() {
+               return name;
+       }
+
        public AggregateFunction<?, ?> getAggregateFunction() {
                return aggregateFunction;
        }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java
index fc365eb..279d436 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java
@@ -40,6 +40,10 @@ public final class ScalarFunctionDefinition implements 
FunctionDefinition {
                this.scalarFunction = 
Preconditions.checkNotNull(scalarFunction);
        }
 
+       public String getName() {
+               return name;
+       }
+
        public ScalarFunction getScalarFunction() {
                return scalarFunction;
        }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java
index 03c4e33..d5c4f1c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java
@@ -49,6 +49,10 @@ public final class TableAggregateFunctionDefinition 
implements FunctionDefinitio
                this.accumulatorTypeInfo = 
Preconditions.checkNotNull(accTypeInfo);
        }
 
+       public String getName() {
+               return name;
+       }
+
        public TableAggregateFunction<?, ?> getTableAggregateFunction() {
                return aggregateFunction;
        }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java
index 0dcf569..8a6392f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java
@@ -46,6 +46,10 @@ public final class TableFunctionDefinition implements 
FunctionDefinition {
                this.resultType = Preconditions.checkNotNull(resultType);
        }
 
+       public String getName() {
+               return name;
+       }
+
        public TableFunction<?> getTableFunction() {
                return tableFunction;
        }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
index 3565a77..a4c3261 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
@@ -43,7 +43,7 @@ public abstract class UserDefinedFunction implements 
FunctionDefinition, Seriali
         */
        public final String functionIdentifier() {
                final String md5 = 
EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)));
-               return getClass().getCanonicalName().replace('.', 
'$').concat("$").concat(md5);
+               return getClass().getName().replace('.', 
'$').concat("$").concat(md5);
        }
 
        /**
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java
index c2ef623..61cc347 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java
@@ -45,6 +45,7 @@ public class CallExpressionResolver {
                        .build()
                        
.getCluster().getPlanner().getContext().unwrap(FlinkContext.class);
                this.resolver = ExpressionResolver.resolverFor(
+                       context.getTableConfig(),
                        name -> Optional.empty(),
                        context.getFunctionCatalog()).build();
        }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index c63bef5..cc7d681 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -614,7 +614,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
       function: UserDefinedFunction,
       functionContextClass: Class[_ <: FunctionContext] = 
classOf[FunctionContext],
       contextTerm: String = null): String = {
-    val classQualifier = function.getClass.getCanonicalName
+    val classQualifier = function.getClass.getName
     val fieldTerm = CodeGenUtils.udfFieldName(function)
 
     addReusableObjectInternal(function, fieldTerm, classQualifier)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
index b4aeb41..b1bf10a 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
@@ -200,10 +200,6 @@ case class PlannerTableFunctionCall(
   override private[flink] def children: Seq[PlannerExpression] = parameters
 
   override def validateInput(): ValidationResult = {
-    // check if not Scala object
-    UserDefinedFunctionHelper.validateNotSingleton(tableFunction.getClass)
-    // check if class could be instantiated
-    UserDefinedFunctionHelper.validateInstantiation(tableFunction.getClass)
     // look for a signature that matches the input types
     val signature = parameters.map(_.resultType).map(fromLegacyInfoToDataType)
     val foundMethod = getUserDefinedMethod(
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index c5bdef7..bf3567b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -89,7 +89,10 @@ public class SqlToOperationConverterTest {
        private final CatalogManager catalogManager =
                new CatalogManager("builtin", catalog);
        private final ModuleManager moduleManager = new ModuleManager();
-       private final FunctionCatalog functionCatalog = new 
FunctionCatalog(catalogManager, moduleManager);
+       private final FunctionCatalog functionCatalog = new FunctionCatalog(
+               tableConfig,
+               catalogManager,
+               moduleManager);
        private final PlannerContext plannerContext =
                new PlannerContext(tableConfig,
                        functionCatalog,
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
index d42db1b..5e101d5 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
@@ -45,11 +45,12 @@ import java.util.Collections
 class WatermarkGeneratorCodeGenTest {
 
   // mock FlinkPlannerImpl to avoid discovering TableEnvironment and Executor.
+  val config = new TableConfig
   val catalog = new GenericInMemoryCatalog("MockCatalog", "default")
   val catalogManager = new CatalogManager("builtin", catalog)
-  val functionCatalog = new FunctionCatalog(catalogManager, new ModuleManager)
+  val functionCatalog = new FunctionCatalog(config, catalogManager, new 
ModuleManager)
   val plannerContext = new PlannerContext(
-    new TableConfig,
+    config,
     functionCatalog,
     catalogManager,
     asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false)),
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
index e248e49..ce25e6a 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
@@ -84,7 +84,8 @@ class AggCallSelectivityEstimatorTest {
     val planner = mock(classOf[AbstractRelOptPlanner])
     val catalogManager = mock(classOf[CatalogManager])
     val moduleManager = mock(classOf[ModuleManager])
-    val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
+    val config = new TableConfig
+    val functionCatalog = new FunctionCatalog(config, catalogManager, 
moduleManager)
     val context = new FlinkContextImpl(new TableConfig, functionCatalog, 
catalogManager)
     when(tableScan, "getCluster").thenReturn(cluster)
     when(cluster, "getRexBuilder").thenReturn(rexBuilder)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index aa3d48f..18cd61a 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -91,7 +91,7 @@ class FlinkRelMdHandlerTestBase {
   val plannerContext: PlannerContext =
     new PlannerContext(
       tableConfig,
-      new FunctionCatalog(catalogManager, moduleManager),
+      new FunctionCatalog(tableConfig, catalogManager, moduleManager),
       catalogManager,
       CalciteSchema.from(rootSchema),
       util.Arrays.asList(
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala
index f89332b..6de6fa5 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala
@@ -88,7 +88,7 @@ class SelectivityEstimatorTest {
     val planner = mock(classOf[AbstractRelOptPlanner])
     val catalogManager = mock(classOf[CatalogManager])
     val moduleManager = mock(classOf[ModuleManager])
-    val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
+    val functionCatalog = new FunctionCatalog(tableConfig, catalogManager, 
moduleManager)
     val context: FlinkContext = new FlinkContextImpl(tableConfig, 
functionCatalog, catalogManager)
     when(tableScan, "getCluster").thenReturn(cluster)
     when(cluster, "getRexBuilder").thenReturn(rexBuilder)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index 35942d8..62c8e9f 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.plan.utils
 
 import org.apache.flink.api.common.typeinfo.Types
-import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.api.{DataTypes, TableConfig}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, 
GenericInMemoryCatalog}
 import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.{unresolvedCall, 
unresolvedRef, valueLiteral}
 import org.apache.flink.table.expressions.{Expression, ExpressionParser}
@@ -56,7 +56,10 @@ class RexNodeExtractorTest extends RexNodeTestBase {
   val catalogManager = new CatalogManager(
     defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, 
"default_database"))
   val moduleManager = new ModuleManager
-  private val functionCatalog = new FunctionCatalog(catalogManager, 
moduleManager)
+  private val functionCatalog = new FunctionCatalog(
+    TableConfig.getDefault,
+    catalogManager,
+    moduleManager)
 
   private val expressionBridge: ExpressionBridge[PlannerExpression] =
     new ExpressionBridge[PlannerExpression](
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
index 573274c..278a2b0 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
@@ -19,14 +19,14 @@
 package org.apache.flink.table.planner.runtime.stream.table
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.table.planner.expressions.utils.{Func1, Func13, 
Func23, Func24, Func25, RichFunc1, RichFunc2}
+import org.apache.flink.table.planner.expressions.utils._
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import org.apache.flink.table.planner.runtime.utils.TestData._
 import 
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, 
TestingAppendSink, TestingRetractSink, UserDefinedFunctionTestUtils}
 import org.apache.flink.types.Row
-
 import org.junit.Assert._
 import org.junit._
 import org.junit.runner.RunWith
@@ -294,6 +294,79 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
   }
 
   @Test
+  def testInlineScalarFunction(): Unit = {
+    val t = env.fromElements(1, 2, 3, 4).toTable(tEnv).as('a)
+
+    val sink = new TestingAppendSink
+    val result = t.select(
+      (new ScalarFunction() {
+        def eval(i: Int, suffix: String): String = {
+          suffix + i
+        }
+      })('a, ">>"))
+    result.toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      ">>1",
+      ">>2",
+      ">>3",
+      ">>4"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testNonStaticObjectScalarFunction(): Unit = {
+    val t = env.fromElements(1, 2, 3, 4).toTable(tEnv).as('a)
+
+    val sink = new TestingAppendSink
+    val result = t.select(NonStaticObjectScalarFunction('a, ">>"))
+
+    result.toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      ">>1",
+      ">>2",
+      ">>3",
+      ">>4"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  object NonStaticObjectScalarFunction extends ScalarFunction {
+    def eval(i: Int, suffix: String): String = {
+      suffix + i
+    }
+  }
+
+  @Test(expected = classOf[ValidationException]) // see FLINK-15162
+  def testNonStaticClassScalarFunction(): Unit = {
+    val t = env.fromElements(1, 2, 3, 4).toTable(tEnv).as('a)
+
+    val sink = new TestingAppendSink
+    val result = t.select(new NonStaticClassScalarFunction()('a, ">>"))
+
+    result.toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      ">>1",
+      ">>2",
+      ">>3",
+      ">>4"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  class NonStaticClassScalarFunction extends ScalarFunction {
+    def eval(i: Int, suffix: String): String = {
+      suffix + i
+    }
+  }
+
+  @Test
   def testMapType(): Unit = {
     val ds = env.fromCollection(tupleData3).toTable(tEnv).select(map('_1, '_3))
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index c65f5c7..16edbbf 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -1087,7 +1087,7 @@ object TestingTableEnvironment {
             settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
     }
     val moduleManager = new ModuleManager
-    val functionCatalog = new FunctionCatalog(catalogMgr, moduleManager)
+    val functionCatalog = new FunctionCatalog(tableConfig, catalogMgr, 
moduleManager)
     val plannerProperties = settings.toPlannerProperties
     val executorProperties = settings.toExecutorProperties
     val executor = ComponentFactoryService.find(classOf[ExecutorFactory],
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 6b21ca9..82f5091 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -60,7 +60,7 @@ abstract class TableEnvImpl(
 
   // Table API/SQL function catalog
   private[flink] val functionCatalog: FunctionCatalog =
-    new FunctionCatalog(catalogManager, moduleManager)
+    new FunctionCatalog(config, catalogManager, moduleManager)
 
   // temporary utility until we don't use planner expressions anymore
   
functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
@@ -90,6 +90,7 @@ abstract class TableEnvImpl(
   }
 
   private[flink] val operationTreeBuilder = OperationTreeBuilder.create(
+    config,
     functionCatalog,
     tableLookup,
     isStreamingMode)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 3bbbc65..9925f0d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -1649,7 +1649,7 @@ abstract class CodeGenerator(
       function: UserDefinedFunction,
       contextTerm: String = null,
       functionContextClass: Class[_ <: FunctionContext] = 
classOf[FunctionContext]): String = {
-    val classQualifier = function.getClass.getCanonicalName
+    val classQualifier = function.getClass.getName
     val functionSerializedData = EncodingUtils.encodeObjectToString(function)
     val fieldTerm = s"function_${function.functionIdentifier}"
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
index b40053a..fc3d263 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -298,10 +298,6 @@ case class PlannerTableFunctionCall(
   override private[flink] def children: Seq[PlannerExpression] = parameters
 
   override def validateInput(): ValidationResult = {
-    // check if not Scala object
-    UserDefinedFunctionHelper.validateNotSingleton(tableFunction.getClass)
-    // check if class could be instantiated
-    UserDefinedFunctionHelper.validateInstantiation(tableFunction.getClass)
     // look for a signature that matches the input types
     val signature = parameters.map(_.resultType)
     val foundMethod = getUserDefinedMethod(tableFunction, "eval", 
typeInfoToClass(signature))
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
index 334fcb3..44b100f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
@@ -23,7 +23,7 @@ import java.util
 import org.apache.calcite.plan.RelOptRule.{none, operand}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rex.RexProgram
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, 
GenericInMemoryCatalog}
 import org.apache.flink.table.expressions.{Expression, PlannerExpression}
 import org.apache.flink.table.module.ModuleManager
@@ -74,7 +74,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
       RexProgramExtractor.extractConjunctiveConditions(
         program,
         call.builder().getRexBuilder,
-        new FunctionCatalog(catalogManager, new ModuleManager))
+        new FunctionCatalog(TableConfig.getDefault, catalogManager, new 
ModuleManager))
     if (predicates.isEmpty) {
       // no condition can be translated to expression
       return
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index f5d9231..7609994 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -86,7 +86,10 @@ public class SqlToOperationConverterTest {
        private final CatalogManager catalogManager =
                new CatalogManager("builtin", catalog);
        private final ModuleManager moduleManager = new ModuleManager();
-       private final FunctionCatalog functionCatalog = new 
FunctionCatalog(catalogManager, moduleManager);
+       private final FunctionCatalog functionCatalog = new FunctionCatalog(
+               tableConfig,
+               catalogManager,
+               moduleManager);
        private final PlanningConfigurationBuilder planningConfigurationBuilder 
=
                new PlanningConfigurationBuilder(tableConfig,
                        functionCatalog,
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index a3450f4..b666039 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -208,7 +208,7 @@ class StreamTableEnvironmentTest extends TableTestBase {
       new GenericInMemoryCatalog("default_catalog", "default_database"))
     val moduleManager: ModuleManager = new ModuleManager
     val executor: StreamExecutor = new StreamExecutor(jStreamExecEnv)
-    val functionCatalog = new FunctionCatalog(manager, moduleManager)
+    val functionCatalog = new FunctionCatalog(config, manager, moduleManager)
     val streamPlanner = new StreamPlanner(executor, config, functionCatalog, 
manager)
     val jTEnv = new JStreamTableEnvironmentImpl(
       manager,
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
index 25f875f..95b57f8 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
@@ -68,15 +68,16 @@ class AggregateTest extends TableTestBase {
   @Test
   def testUserDefinedAggregateFunctionWithScalaAccumulator(): Unit = {
     val defaultCatalog = "default_catalog"
+    val config = new TableConfig
     val catalogManager = new CatalogManager(
       defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, 
"default_database"))
     val moduleManager = new ModuleManager
-    val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
+    val functionCatalog = new FunctionCatalog(config, catalogManager, 
moduleManager)
     val tablEnv = new StreamTableEnvironmentImpl(
       catalogManager,
       moduleManager,
       functionCatalog,
-      new TableConfig,
+      config,
       Mockito.mock(classOf[StreamExecutionEnvironment]),
       Mockito.mock(classOf[Planner]),
       Mockito.mock(classOf[Executor]),
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
index d109726..1f0da00 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
@@ -28,6 +28,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, INTEGER, VARCHAR}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.util.{DateString, TimeString, TimestampString}
+import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, 
GenericInMemoryCatalog}
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.module.ModuleManager
@@ -43,6 +44,7 @@ import scala.collection.mutable
 class RexProgramExtractorTest extends RexProgramTestBase {
 
   private val functionCatalog: FunctionCatalog = new FunctionCatalog(
+    TableConfig.getDefault,
     new CatalogManager("default_catalog", new 
GenericInMemoryCatalog("default_catalog")),
     new ModuleManager
   )
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index 521b139..912f23b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -20,8 +20,10 @@ package org.apache.flink.table.runtime.stream.table
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.utils._
+import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, 
UserDefinedFunctionTestUtils}
 import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
@@ -328,6 +330,92 @@ class CalcITCase extends AbstractTestBase {
   }
 
   @Test
+  def testInlineScalarFunction(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = StreamTableEnvironment.create(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+
+    val t = env.fromElements(1, 2, 3, 4).toTable(tEnv).as('a)
+
+    val result = t.select(
+      (new ScalarFunction() {
+        def eval(i: Int, suffix: String): String = {
+          suffix + i
+        }
+      })('a, ">>"))
+
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      ">>1",
+      ">>2",
+      ">>3",
+      ">>4"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testNonStaticObjectScalarFunction(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = StreamTableEnvironment.create(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+
+    val t = env.fromElements(1, 2, 3, 4).toTable(tEnv).as('a)
+
+    val result = t.select(NonStaticObjectScalarFunction('a, ">>"))
+
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      ">>1",
+      ">>2",
+      ">>3",
+      ">>4"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  object NonStaticObjectScalarFunction extends ScalarFunction {
+    def eval(i: Int, suffix: String): String = {
+      suffix + i
+    }
+  }
+
+  @Test(expected = classOf[ValidationException]) // see FLINK-15162
+  def testNonStaticClassScalarFunction(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = StreamTableEnvironment.create(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+
+    val t = env.fromElements(1, 2, 3, 4).toTable(tEnv).as('a)
+
+    val result = t.select(new NonStaticClassScalarFunction()('a, ">>"))
+
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      ">>1",
+      ">>2",
+      ">>3",
+      ">>4"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  class NonStaticClassScalarFunction extends ScalarFunction {
+    def eval(i: Int, suffix: String): String = {
+      suffix + i
+    }
+  }
+
+  @Test
   def testMapType(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 5ebc79e..abe6f80 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -332,7 +332,7 @@ case class StreamTableTestUtil(
   private val manager: CatalogManager = 
catalogManager.getOrElse(createCatalogManager())
   private val moduleManager: ModuleManager = new ModuleManager
   private val executor: StreamExecutor = new StreamExecutor(javaEnv)
-  private val functionCatalog = new FunctionCatalog(manager, moduleManager)
+  private val functionCatalog = new FunctionCatalog(tableConfig, manager, 
moduleManager)
   private val streamPlanner = new StreamPlanner(executor, tableConfig, 
functionCatalog, manager)
 
   val javaTableEnv = new JavaStreamTableEnvironmentImpl(

Reply via email to