This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit ec40d8528621d7533565261b8e46de91a14c40ee Author: Timo Walther <[email protected]> AuthorDate: Mon Dec 9 17:34:12 2019 +0100 [FLINK-12283][table] Relax UDF constraints by using the ClosureCleaner This relaxes the UDF constraints as much as possible by using the ClosureCleaner. For Java users, this is very convenient. For Scala users, it improves the current status but we still need to fix FLINK-15162. It ensures that the validation and cleaning happens at exactly 2 locations. Either during registration in the function catalog or during resolution of inline, unregistered functions. This closes #10519. --- .../client/gateway/local/ExecutionContext.java | 27 ++++--- .../java/internal/StreamTableEnvironmentImpl.java | 2 +- .../internal/StreamTableEnvironmentImplTest.java | 5 +- .../table/api/internal/TableEnvironmentImpl.java | 6 +- .../flink/table/catalog/FunctionCatalog.java | 36 ++++----- .../expressions/resolver/ExpressionResolver.java | 18 ++++- .../resolver/rules/ResolveCallByArgumentsRule.java | 50 +++++++++++- .../expressions/resolver/rules/ResolverRule.java | 6 ++ .../table/functions/UserDefinedFunctionHelper.java | 40 ++++++++-- .../operations/utils/OperationTreeBuilder.java | 36 +++++++-- .../flink/table/catalog/FunctionCatalogTest.java | 2 + .../flink/table/utils/TableEnvironmentMock.java | 12 ++- .../internal/StreamTableEnvironmentImpl.scala | 2 +- .../internal/StreamTableEnvironmentImplTest.scala | 5 +- .../functions/AggregateFunctionDefinition.java | 4 + .../table/functions/ScalarFunctionDefinition.java | 4 + .../TableAggregateFunctionDefinition.java | 4 + .../table/functions/TableFunctionDefinition.java | 4 + .../flink/table/functions/UserDefinedFunction.java | 2 +- .../expressions/CallExpressionResolver.java | 1 + .../planner/codegen/CodeGeneratorContext.scala | 2 +- .../flink/table/planner/expressions/call.scala | 4 - .../operations/SqlToOperationConverterTest.java | 5 +- .../codegen/WatermarkGeneratorCodeGenTest.scala | 5 +- .../metadata/AggCallSelectivityEstimatorTest.scala | 3 +- .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 2 +- .../plan/metadata/SelectivityEstimatorTest.scala | 2 +- .../planner/plan/utils/RexNodeExtractorTest.scala | 7 +- .../planner/runtime/stream/table/CalcITCase.scala | 77 ++++++++++++++++++- .../flink/table/planner/utils/TableTestBase.scala | 2 +- .../flink/table/api/internal/TableEnvImpl.scala | 3 +- .../apache/flink/table/codegen/CodeGenerator.scala | 2 +- .../org/apache/flink/table/expressions/call.scala | 4 - .../PushFilterIntoTableSourceScanRule.scala | 4 +- .../table/sqlexec/SqlToOperationConverterTest.java | 5 +- .../api/stream/StreamTableEnvironmentTest.scala | 2 +- .../flink/table/api/stream/sql/AggregateTest.scala | 5 +- .../flink/table/plan/RexProgramExtractorTest.scala | 2 + .../table/runtime/stream/table/CalcITCase.scala | 88 ++++++++++++++++++++++ .../apache/flink/table/utils/TableTestBase.scala | 2 +- 40 files changed, 405 insertions(+), 87 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index 26f5322..8a1f082 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -393,13 +393,12 @@ public class ExecutionContext<ClusterID> { private static TableEnvironment createStreamTableEnvironment( StreamExecutionEnvironment env, EnvironmentSettings settings, + TableConfig config, Executor executor, CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog) { - final TableConfig config = TableConfig.getDefault(); - final Map<String, String> plannerProperties = settings.toPlannerProperties(); final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) .create(plannerProperties, executor, config, functionCatalog, catalogManager); @@ -441,6 +440,10 @@ public class ExecutionContext<ClusterID> { //-------------------------------------------------------------------------------------------------------------- // Step.1 Create environments //-------------------------------------------------------------------------------------------------------------- + // Step 1.0 Initialize the table configuration. + final TableConfig config = new TableConfig(); + environment.getConfiguration().asMap().forEach((k, v) -> + config.getConfiguration().setString(k, v)); // Step 1.1 Initialize the CatalogManager if required. final CatalogManager catalogManager = new CatalogManager( settings.getBuiltInCatalogName(), @@ -450,12 +453,12 @@ public class ExecutionContext<ClusterID> { // Step 1.2 Initialize the ModuleManager if required. final ModuleManager moduleManager = new ModuleManager(); // Step 1.3 Initialize the FunctionCatalog if required. - final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager, moduleManager); + final FunctionCatalog functionCatalog = new FunctionCatalog(config, catalogManager, moduleManager); // Step 1.4 Set up session state. - this.sessionState = SessionState.of(catalogManager, moduleManager, functionCatalog); + this.sessionState = SessionState.of(config, catalogManager, moduleManager, functionCatalog); // Must initialize the table environment before actually the - createTableEnvironment(settings, catalogManager, moduleManager, functionCatalog); + createTableEnvironment(settings, config, catalogManager, moduleManager, functionCatalog); //-------------------------------------------------------------------------------------------------------------- // Step.2 Create modules and load them into the TableEnvironment. @@ -487,6 +490,7 @@ public class ExecutionContext<ClusterID> { this.sessionState = sessionState; createTableEnvironment( settings, + sessionState.config, sessionState.catalogManager, sessionState.moduleManager, sessionState.functionCatalog); @@ -495,6 +499,7 @@ public class ExecutionContext<ClusterID> { private void createTableEnvironment( EnvironmentSettings settings, + TableConfig config, CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog) { @@ -507,6 +512,7 @@ public class ExecutionContext<ClusterID> { tableEnv = createStreamTableEnvironment( streamExecEnv, settings, + config, executor, catalogManager, moduleManager, @@ -517,15 +523,12 @@ public class ExecutionContext<ClusterID> { executor = null; tableEnv = new BatchTableEnvironmentImpl( execEnv, - TableConfig.getDefault(), + config, catalogManager, moduleManager); } else { throw new SqlExecutionException("Unsupported execution type specified."); } - // set table configuration - environment.getConfiguration().asMap().forEach((k, v) -> - tableEnv.getConfig().getConfiguration().setString(k, v)); } private void initializeCatalogs() { @@ -760,24 +763,28 @@ public class ExecutionContext<ClusterID> { /** Represents the state that should be reused in one session. **/ public static class SessionState { + public final TableConfig config; public final CatalogManager catalogManager; public final ModuleManager moduleManager; public final FunctionCatalog functionCatalog; private SessionState( + TableConfig config, CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog) { + this.config = config; this.catalogManager = catalogManager; this.moduleManager = moduleManager; this.functionCatalog = functionCatalog; } public static SessionState of( + TableConfig config, CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog) { - return new SessionState(catalogManager, moduleManager, functionCatalog); + return new SessionState(config, catalogManager, moduleManager, functionCatalog); } } } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java index 760a934..d269fc4 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java @@ -110,7 +110,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple ModuleManager moduleManager = new ModuleManager(); - FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager, moduleManager); + FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager); Map<String, String> executorProperties = settings.toExecutorProperties(); Executor executor = lookupExecutor(executorProperties, executionEnvironment); diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImplTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImplTest.java index 1a0cf5a..3ddf5c8 100644 --- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImplTest.java +++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImplTest.java @@ -90,13 +90,14 @@ public class StreamTableEnvironmentImplTest { private StreamTableEnvironmentImpl getStreamTableEnvironment( StreamExecutionEnvironment env, DataStreamSource<Integer> elements) { + TableConfig config = new TableConfig(); CatalogManager catalogManager = new CatalogManager("cat", new GenericInMemoryCatalog("cat", "db")); ModuleManager moduleManager = new ModuleManager(); return new StreamTableEnvironmentImpl( catalogManager, moduleManager, - new FunctionCatalog(catalogManager, moduleManager), - new TableConfig(), + new FunctionCatalog(config, catalogManager, moduleManager), + config, env, new TestPlanner(elements.getTransformation()), new ExecutorMock(), diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 712c6b2..5429509 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -169,6 +169,7 @@ public class TableEnvironmentImpl implements TableEnvironment { this.planner = planner; this.parser = planner.getParser(); this.operationTreeBuilder = OperationTreeBuilder.create( + tableConfig, functionCatalog, path -> { try { @@ -189,18 +190,19 @@ public class TableEnvironmentImpl implements TableEnvironment { public static TableEnvironmentImpl create(EnvironmentSettings settings) { + TableConfig tableConfig = new TableConfig(); + CatalogManager catalogManager = new CatalogManager( settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())); ModuleManager moduleManager = new ModuleManager(); - FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager, moduleManager); + FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager); Map<String, String> executorProperties = settings.toExecutorProperties(); Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties) .create(executorProperties); - TableConfig tableConfig = new TableConfig(); Map<String, String> plannerProperties = settings.toPlannerProperties(); Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) .create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java index be94b0d..1838c13 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java @@ -20,6 +20,7 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; @@ -55,6 +56,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ @Internal public class FunctionCatalog implements FunctionLookup { + private final TableConfig config; private final CatalogManager catalogManager; private final ModuleManager moduleManager; @@ -66,7 +68,11 @@ public class FunctionCatalog implements FunctionLookup { */ private PlannerTypeInferenceUtil plannerTypeInferenceUtil; - public FunctionCatalog(CatalogManager catalogManager, ModuleManager moduleManager) { + public FunctionCatalog( + TableConfig config, + CatalogManager catalogManager, + ModuleManager moduleManager) { + this.config = checkNotNull(config); this.catalogManager = checkNotNull(catalogManager); this.moduleManager = checkNotNull(moduleManager); } @@ -76,7 +82,8 @@ public class FunctionCatalog implements FunctionLookup { } public void registerTempSystemScalarFunction(String name, ScalarFunction function) { - UserDefinedFunctionHelper.validateInstantiation(function.getClass()); + UserDefinedFunctionHelper.prepareFunction(config, function); + registerTempSystemFunction( name, new ScalarFunctionDefinition(name, function) @@ -87,10 +94,7 @@ public class FunctionCatalog implements FunctionLookup { String name, TableFunction<T> function, TypeInformation<T> resultType) { - // check if class not Scala object - UserDefinedFunctionHelper.validateNotSingleton(function.getClass()); - // check if class could be instantiated - UserDefinedFunctionHelper.validateInstantiation(function.getClass()); + UserDefinedFunctionHelper.prepareFunction(config, function); registerTempSystemFunction( name, @@ -106,10 +110,7 @@ public class FunctionCatalog implements FunctionLookup { UserDefinedAggregateFunction<T, ACC> function, TypeInformation<T> resultType, TypeInformation<ACC> accType) { - // check if class not Scala object - UserDefinedFunctionHelper.validateNotSingleton(function.getClass()); - // check if class could be instantiated - UserDefinedFunctionHelper.validateInstantiation(function.getClass()); + UserDefinedFunctionHelper.prepareFunction(config, function); final FunctionDefinition definition; if (function instanceof AggregateFunction) { @@ -135,7 +136,8 @@ public class FunctionCatalog implements FunctionLookup { } public void registerTempCatalogScalarFunction(ObjectIdentifier oi, ScalarFunction function) { - UserDefinedFunctionHelper.validateInstantiation(function.getClass()); + UserDefinedFunctionHelper.prepareFunction(config, function); + registerTempCatalogFunction( oi, new ScalarFunctionDefinition(oi.getObjectName(), function) @@ -146,10 +148,7 @@ public class FunctionCatalog implements FunctionLookup { ObjectIdentifier oi, TableFunction<T> function, TypeInformation<T> resultType) { - // check if class not Scala object - UserDefinedFunctionHelper.validateNotSingleton(function.getClass()); - // check if class could be instantiated - UserDefinedFunctionHelper.validateInstantiation(function.getClass()); + UserDefinedFunctionHelper.prepareFunction(config, function); registerTempCatalogFunction( oi, @@ -165,10 +164,7 @@ public class FunctionCatalog implements FunctionLookup { UserDefinedAggregateFunction<T, ACC> function, TypeInformation<T> resultType, TypeInformation<ACC> accType) { - // check if class not Scala object - UserDefinedFunctionHelper.validateNotSingleton(function.getClass()); - // check if class could be instantiated - UserDefinedFunctionHelper.validateInstantiation(function.getClass()); + UserDefinedFunctionHelper.prepareFunction(config, function); final FunctionDefinition definition; if (function instanceof AggregateFunction) { @@ -378,7 +374,7 @@ public class FunctionCatalog implements FunctionLookup { )).orElseGet(() -> resolvePreciseFunctionReference(oi)); } - @Override + @Override public PlannerTypeInferenceUtil getPlannerTypeInferenceUtil() { Preconditions.checkNotNull( plannerTypeInferenceUtil, diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java index 1763404..78a18a7 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java @@ -21,6 +21,7 @@ package org.apache.flink.table.expressions.resolver; import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.GroupWindow; import org.apache.flink.table.api.OverWindow; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.FunctionLookup; import org.apache.flink.table.expressions.CallExpression; @@ -95,6 +96,8 @@ public class ExpressionResolver { private static final VerifyResolutionVisitor VERIFY_RESOLUTION_VISITOR = new VerifyResolutionVisitor(); + private final TableConfig config; + private final FieldReferenceLookup fieldLookup; private final TableReferenceLookup tableLookup; @@ -108,11 +111,13 @@ public class ExpressionResolver { private final Map<Expression, LocalOverWindow> localOverWindows; private ExpressionResolver( + TableConfig config, TableReferenceLookup tableLookup, FunctionLookup functionLookup, FieldReferenceLookup fieldLookup, List<OverWindow> localOverWindows, List<LocalReferenceExpression> localReferences) { + this.config = Preconditions.checkNotNull(config); this.tableLookup = Preconditions.checkNotNull(tableLookup); this.fieldLookup = Preconditions.checkNotNull(fieldLookup); this.functionLookup = Preconditions.checkNotNull(functionLookup); @@ -128,16 +133,18 @@ public class ExpressionResolver { * Creates a builder for {@link ExpressionResolver}. One can add additional properties to the resolver * like e.g. {@link GroupWindow} or {@link OverWindow}. You can also add additional {@link ResolverRule}. * + * @param config general configuration * @param tableCatalog a way to lookup a table reference by name * @param functionLookup a way to lookup call by name * @param inputs inputs to use for field resolution * @return builder for resolver */ public static ExpressionResolverBuilder resolverFor( + TableConfig config, TableReferenceLookup tableCatalog, FunctionLookup functionLookup, QueryOperation... inputs) { - return new ExpressionResolverBuilder(inputs, tableCatalog, functionLookup); + return new ExpressionResolverBuilder(inputs, config, tableCatalog, functionLookup); } /** @@ -242,6 +249,11 @@ public class ExpressionResolver { private class ExpressionResolverContext implements ResolverRule.ResolutionContext { @Override + public TableConfig configuration() { + return config; + } + + @Override public FieldReferenceLookup referenceLookup() { return fieldLookup; } @@ -341,6 +353,7 @@ public class ExpressionResolver { */ public static class ExpressionResolverBuilder { + private final TableConfig config; private final List<QueryOperation> queryOperations; private final TableReferenceLookup tableCatalog; private final FunctionLookup functionLookup; @@ -349,8 +362,10 @@ public class ExpressionResolver { private ExpressionResolverBuilder( QueryOperation[] queryOperations, + TableConfig config, TableReferenceLookup tableCatalog, FunctionLookup functionLookup) { + this.config = config; this.queryOperations = Arrays.asList(queryOperations); this.tableCatalog = tableCatalog; this.functionLookup = functionLookup; @@ -368,6 +383,7 @@ public class ExpressionResolver { public ExpressionResolver build() { return new ExpressionResolver( + config, tableCatalog, functionLookup, new FieldReferenceLookup(queryOperations), diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java index 122b207..a31efa4 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java @@ -29,10 +29,16 @@ import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.UnresolvedCallExpression; import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.AggregateFunctionDefinition; import org.apache.flink.table.functions.BuiltInFunctionDefinition; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.table.functions.ScalarFunctionDefinition; +import org.apache.flink.table.functions.TableAggregateFunctionDefinition; +import org.apache.flink.table.functions.TableFunctionDefinition; +import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.functions.UserDefinedFunctionHelper; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.TypeInference; @@ -58,13 +64,14 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoT /** * This rule checks if a {@link UnresolvedCallExpression} can work with the given arguments and infers - * the output data type. All function calls are resolved {@link CallExpression} after applying this - * rule. + * the output data type. All function calls are resolved {@link CallExpression} after applying this rule. * * <p>This rule also resolves {@code flatten()} calls on composite types. * * <p>If the call expects different types of arguments, but the given arguments have types that can * be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted. + * + * <p>It validates and prepares inline, unregistered {@link UserDefinedFunction}s. */ @Internal final class ResolveCallByArgumentsRule implements ResolverRule { @@ -89,7 +96,7 @@ final class ResolveCallByArgumentsRule implements ResolverRule { @Override public List<ResolvedExpression> visit(UnresolvedCallExpression unresolvedCall) { - final FunctionDefinition definition = unresolvedCall.getFunctionDefinition(); + final FunctionDefinition definition = prepareUserDefinedFunction(unresolvedCall.getFunctionDefinition()); final String name = unresolvedCall.getFunctionIdentifier() .map(FunctionIdentifier::toString) @@ -220,6 +227,43 @@ final class ResolveCallByArgumentsRule implements ResolverRule { }) .collect(Collectors.toList()); } + + /** + * Validates and cleans an inline, unregistered {@link UserDefinedFunction}. + */ + private FunctionDefinition prepareUserDefinedFunction(FunctionDefinition definition) { + if (definition instanceof ScalarFunctionDefinition) { + final ScalarFunctionDefinition sf = (ScalarFunctionDefinition) definition; + UserDefinedFunctionHelper.prepareFunction(resolutionContext.configuration(), sf.getScalarFunction()); + return new ScalarFunctionDefinition( + sf.getName(), + sf.getScalarFunction()); + } else if (definition instanceof TableFunctionDefinition) { + final TableFunctionDefinition tf = (TableFunctionDefinition) definition; + UserDefinedFunctionHelper.prepareFunction(resolutionContext.configuration(), tf.getTableFunction()); + return new TableFunctionDefinition( + tf.getName(), + tf.getTableFunction(), + tf.getResultType()); + } else if (definition instanceof AggregateFunctionDefinition) { + final AggregateFunctionDefinition af = (AggregateFunctionDefinition) definition; + UserDefinedFunctionHelper.prepareFunction(resolutionContext.configuration(), af.getAggregateFunction()); + return new AggregateFunctionDefinition( + af.getName(), + af.getAggregateFunction(), + af.getResultTypeInfo(), + af.getAccumulatorTypeInfo()); + } else if (definition instanceof TableAggregateFunctionDefinition) { + final TableAggregateFunctionDefinition taf = (TableAggregateFunctionDefinition) definition; + UserDefinedFunctionHelper.prepareFunction(resolutionContext.configuration(), taf.getTableAggregateFunction()); + return new TableAggregateFunctionDefinition( + taf.getName(), + taf.getTableAggregateFunction(), + taf.getResultTypeInfo(), + taf.getAccumulatorTypeInfo()); + } + return definition; + } } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java index 0a7fed0..d8525c8 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java @@ -19,6 +19,7 @@ package org.apache.flink.table.expressions.resolver.rules; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.catalog.FunctionLookup; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.LocalReferenceExpression; @@ -48,6 +49,11 @@ public interface ResolverRule { interface ResolutionContext { /** + * Access to configuration. + */ + TableConfig configuration(); + + /** * Access to available {@link org.apache.flink.table.expressions.FieldReferenceExpression} in inputs. */ FieldReferenceLookup referenceLookup(); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java index 6069cb9..35398ec 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java @@ -21,6 +21,7 @@ package org.apache.flink.table.functions; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig.ClosureCleanerLevel; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.table.api.TableConfig; @@ -146,9 +147,20 @@ public class UserDefinedFunctionHelper { } /** + * Prepares a {@link UserDefinedFunction} for usage in the API. + */ + public static void prepareFunction(TableConfig config, UserDefinedFunction function) { + if (function instanceof TableFunction) { + UserDefinedFunctionHelper.validateNotSingleton(function.getClass()); + } + UserDefinedFunctionHelper.validateInstantiation(function.getClass()); + UserDefinedFunctionHelper.cleanFunction(config, function); + } + + /** * Checks if a user-defined function can be easily instantiated. */ - public static void validateInstantiation(Class<?> clazz) { + private static void validateInstantiation(Class<?> clazz) { if (!InstantiationUtil.isPublic(clazz)) { throw new ValidationException(String.format("Function class %s is not public.", clazz.getCanonicalName())); } else if (!InstantiationUtil.isProperClass(clazz)) { @@ -159,18 +171,36 @@ public class UserDefinedFunctionHelper { } /** - * Check whether this is a Scala object. It is forbidden to use {@link TableFunction} implemented - * by a Scala object, since concurrent risks. + * Check whether this is a Scala object. Using Scala objects can lead to concurrency issues, + * e.g., due to a shared collector. */ - public static void validateNotSingleton(Class<?> clazz) { + private static void validateNotSingleton(Class<?> clazz) { // TODO it is not a good way to check singleton. Maybe improve it further. if (Arrays.stream(clazz.getFields()).anyMatch(f -> f.getName().equals("MODULE$"))) { throw new ValidationException(String.format( - "TableFunction implemented by class %s is a Scala object. This is forbidden because of concurrency" + + "Function implemented by class %s is a Scala object. This is forbidden because of concurrency" + " problems when using them.", clazz.getCanonicalName())); } } + /** + * Modifies a function instance by removing any reference to outer classes. This enables + * non-static inner function classes. + */ + private static void cleanFunction(TableConfig config, UserDefinedFunction function) { + final ClosureCleanerLevel level = config.getConfiguration().get(PipelineOptions.CLOSURE_CLEANER_LEVEL); + try { + ClosureCleaner.clean(function, level, true); + } catch (Throwable t) { + throw new ValidationException( + String.format( + "Function class '%s' is not serializable. Make sure that the class is self-contained " + + "(i.e. no references to outer classes) and all inner fields are serializable as well.", + function.getClass()), + t); + } + } + private UserDefinedFunctionHelper() { } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java index a27aeba..05fa8b3 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.api.GroupWindow; import org.apache.flink.table.api.OverWindow; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.FunctionLookup; @@ -85,6 +86,7 @@ import static org.apache.flink.table.operations.SetQueryOperation.SetQueryOperat @Internal public final class OperationTreeBuilder { + private final TableConfig config; private final FunctionLookup functionCatalog; private final TableReferenceLookup tableReferenceLookup; private final LookupCallResolver lookupResolver; @@ -100,6 +102,7 @@ public final class OperationTreeBuilder { private final JoinOperationFactory joinOperationFactory; private OperationTreeBuilder( + TableConfig config, FunctionLookup functionLookup, TableReferenceLookup tableReferenceLookup, ProjectionOperationFactory projectionOperationFactory, @@ -108,6 +111,7 @@ public final class OperationTreeBuilder { SetOperationFactory setOperationFactory, AggregateOperationFactory aggregateOperationFactory, JoinOperationFactory joinOperationFactory) { + this.config = config; this.functionCatalog = functionLookup; this.tableReferenceLookup = tableReferenceLookup; this.projectionOperationFactory = projectionOperationFactory; @@ -120,10 +124,12 @@ public final class OperationTreeBuilder { } public static OperationTreeBuilder create( + TableConfig config, FunctionLookup functionCatalog, TableReferenceLookup tableReferenceLookup, boolean isStreamingMode) { return new OperationTreeBuilder( + config, functionCatalog, tableReferenceLookup, new ProjectionOperationFactory(), @@ -168,7 +174,11 @@ public final class OperationTreeBuilder { boolean explicitAlias, List<OverWindow> overWindows) { - ExpressionResolver resolver = ExpressionResolver.resolverFor(tableReferenceLookup, functionCatalog, child) + ExpressionResolver resolver = ExpressionResolver.resolverFor( + config, + tableReferenceLookup, + functionCatalog, + child) .withOverWindows(overWindows) .build(); List<ResolvedExpression> projections = resolver.resolve(projectList); @@ -233,6 +243,7 @@ public final class OperationTreeBuilder { ResolvedGroupWindow resolvedWindow = aggregateOperationFactory.createResolvedWindow(window, resolver); ExpressionResolver resolverWithWindowReferences = ExpressionResolver.resolverFor( + config, tableReferenceLookup, functionCatalog, child) @@ -282,9 +293,10 @@ public final class OperationTreeBuilder { // Step3: resolve expressions, including grouping, aggregates and window properties. ResolvedGroupWindow resolvedWindow = aggregateOperationFactory.createResolvedWindow(window, resolver); ExpressionResolver resolverWithWindowReferences = ExpressionResolver.resolverFor( - tableReferenceLookup, - functionCatalog, - child) + config, + tableReferenceLookup, + functionCatalog, + child) .withLocalReferences( new LocalReferenceExpression( resolvedWindow.getAlias(), @@ -327,7 +339,12 @@ public final class OperationTreeBuilder { JoinType joinType, Optional<Expression> condition, boolean correlated) { - ExpressionResolver resolver = ExpressionResolver.resolverFor(tableReferenceLookup, functionCatalog, left, right) + ExpressionResolver resolver = ExpressionResolver.resolverFor( + config, + tableReferenceLookup, + functionCatalog, + left, + right) .build(); Optional<ResolvedExpression> resolvedCondition = condition.map(expr -> resolveSingleExpression(expr, resolver)); @@ -356,6 +373,7 @@ public final class OperationTreeBuilder { public Expression resolveExpression(Expression expression, QueryOperation... tableOperation) { ExpressionResolver resolver = ExpressionResolver.resolverFor( + config, tableReferenceLookup, functionCatalog, tableOperation).build(); @@ -660,6 +678,7 @@ public final class OperationTreeBuilder { ResolvedGroupWindow resolvedWindow = aggregateOperationFactory.createResolvedWindow(window, resolver); ExpressionResolver resolverWithWindowReferences = ExpressionResolver.resolverFor( + config, tableReferenceLookup, functionCatalog, child) @@ -758,7 +777,12 @@ public final class OperationTreeBuilder { } private ExpressionResolver getResolver(QueryOperation child) { - return ExpressionResolver.resolverFor(tableReferenceLookup, functionCatalog, child).build(); + return ExpressionResolver.resolverFor( + config, + tableReferenceLookup, + functionCatalog, + child) + .build(); } private static class NoWindowPropertyChecker extends ApiExpressionDefaultVisitor<Void> { diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java index 05be6d3..bdf64fb 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java @@ -18,6 +18,7 @@ package org.apache.flink.table.catalog; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; @@ -61,6 +62,7 @@ public class FunctionCatalogTest { catalog = new GenericInMemoryCatalog(testCatalogName); moduleManager = new ModuleManager(); functionCatalog = new FunctionCatalog( + TableConfig.getDefault(), new CatalogManager(testCatalogName, catalog), moduleManager); } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java index fa2c58c..57af810 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java @@ -65,14 +65,15 @@ public class TableEnvironmentMock extends TableEnvironmentImpl { } private static TableEnvironmentMock getInstance(boolean isStreamingMode) { + final TableConfig config = createTableConfig(); final CatalogManager catalogManager = createCatalogManager(); final ModuleManager moduleManager = new ModuleManager(); return new TableEnvironmentMock( catalogManager, moduleManager, - createTableConfig(), + config, createExecutor(), - createFunctionCatalog(catalogManager, moduleManager), + createFunctionCatalog(config, catalogManager, moduleManager), createPlanner(), isStreamingMode); } @@ -93,8 +94,11 @@ public class TableEnvironmentMock extends TableEnvironmentImpl { return new ExecutorMock(); } - private static FunctionCatalog createFunctionCatalog(CatalogManager catalogManager, ModuleManager moduleManager) { - return new FunctionCatalog(catalogManager, moduleManager); + private static FunctionCatalog createFunctionCatalog( + TableConfig config, + CatalogManager catalogManager, + ModuleManager moduleManager) { + return new FunctionCatalog(config, catalogManager, moduleManager); } private static PlannerMock createPlanner() { diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala index fcddda9..5913d8a 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala @@ -315,7 +315,7 @@ object StreamTableEnvironmentImpl { new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName)) val moduleManager = new ModuleManager - val functionCatalog = new FunctionCatalog(catalogManager, moduleManager) + val functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager) val executorProperties = settings.toExecutorProperties val executor = lookupExecutor(executorProperties, executionEnvironment) diff --git a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala index e238ddc..48bebee 100644 --- a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala +++ b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala @@ -81,6 +81,7 @@ class StreamTableEnvironmentImplTest { private def getStreamTableEnvironment( env: StreamExecutionEnvironment, elements: DataStream[Int]) = { + val config = new TableConfig val catalogManager = new CatalogManager( "cat", new GenericInMemoryCatalog("cat", "db")) @@ -88,8 +89,8 @@ class StreamTableEnvironmentImplTest { new StreamTableEnvironmentImpl( catalogManager, moduleManager, - new FunctionCatalog(catalogManager, moduleManager), - new TableConfig, + new FunctionCatalog(config, catalogManager, moduleManager), + config, env, new TestPlanner(elements.javaStream.getTransformation), new ExecutorMock, diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java index b06c55a..df01db0 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java @@ -49,6 +49,10 @@ public final class AggregateFunctionDefinition implements FunctionDefinition { this.accumulatorTypeInfo = Preconditions.checkNotNull(accTypeInfo); } + public String getName() { + return name; + } + public AggregateFunction<?, ?> getAggregateFunction() { return aggregateFunction; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java index fc365eb..279d436 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java @@ -40,6 +40,10 @@ public final class ScalarFunctionDefinition implements FunctionDefinition { this.scalarFunction = Preconditions.checkNotNull(scalarFunction); } + public String getName() { + return name; + } + public ScalarFunction getScalarFunction() { return scalarFunction; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java index 03c4e33..d5c4f1c 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java @@ -49,6 +49,10 @@ public final class TableAggregateFunctionDefinition implements FunctionDefinitio this.accumulatorTypeInfo = Preconditions.checkNotNull(accTypeInfo); } + public String getName() { + return name; + } + public TableAggregateFunction<?, ?> getTableAggregateFunction() { return aggregateFunction; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java index 0dcf569..8a6392f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java @@ -46,6 +46,10 @@ public final class TableFunctionDefinition implements FunctionDefinition { this.resultType = Preconditions.checkNotNull(resultType); } + public String getName() { + return name; + } + public TableFunction<?> getTableFunction() { return tableFunction; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java index 3565a77..a4c3261 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java @@ -43,7 +43,7 @@ public abstract class UserDefinedFunction implements FunctionDefinition, Seriali */ public final String functionIdentifier() { final String md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this))); - return getClass().getCanonicalName().replace('.', '$').concat("$").concat(md5); + return getClass().getName().replace('.', '$').concat("$").concat(md5); } /** diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java index c2ef623..61cc347 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java @@ -45,6 +45,7 @@ public class CallExpressionResolver { .build() .getCluster().getPlanner().getContext().unwrap(FlinkContext.class); this.resolver = ExpressionResolver.resolverFor( + context.getTableConfig(), name -> Optional.empty(), context.getFunctionCatalog()).build(); } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala index c63bef5..cc7d681 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala @@ -614,7 +614,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) { function: UserDefinedFunction, functionContextClass: Class[_ <: FunctionContext] = classOf[FunctionContext], contextTerm: String = null): String = { - val classQualifier = function.getClass.getCanonicalName + val classQualifier = function.getClass.getName val fieldTerm = CodeGenUtils.udfFieldName(function) addReusableObjectInternal(function, fieldTerm, classQualifier) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala index b4aeb41..b1bf10a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala @@ -200,10 +200,6 @@ case class PlannerTableFunctionCall( override private[flink] def children: Seq[PlannerExpression] = parameters override def validateInput(): ValidationResult = { - // check if not Scala object - UserDefinedFunctionHelper.validateNotSingleton(tableFunction.getClass) - // check if class could be instantiated - UserDefinedFunctionHelper.validateInstantiation(tableFunction.getClass) // look for a signature that matches the input types val signature = parameters.map(_.resultType).map(fromLegacyInfoToDataType) val foundMethod = getUserDefinedMethod( diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index c5bdef7..bf3567b 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -89,7 +89,10 @@ public class SqlToOperationConverterTest { private final CatalogManager catalogManager = new CatalogManager("builtin", catalog); private final ModuleManager moduleManager = new ModuleManager(); - private final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager, moduleManager); + private final FunctionCatalog functionCatalog = new FunctionCatalog( + tableConfig, + catalogManager, + moduleManager); private final PlannerContext plannerContext = new PlannerContext(tableConfig, functionCatalog, diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala index d42db1b..5e101d5 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala @@ -45,11 +45,12 @@ import java.util.Collections class WatermarkGeneratorCodeGenTest { // mock FlinkPlannerImpl to avoid discovering TableEnvironment and Executor. + val config = new TableConfig val catalog = new GenericInMemoryCatalog("MockCatalog", "default") val catalogManager = new CatalogManager("builtin", catalog) - val functionCatalog = new FunctionCatalog(catalogManager, new ModuleManager) + val functionCatalog = new FunctionCatalog(config, catalogManager, new ModuleManager) val plannerContext = new PlannerContext( - new TableConfig, + config, functionCatalog, catalogManager, asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false)), diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala index e248e49..ce25e6a 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala @@ -84,7 +84,8 @@ class AggCallSelectivityEstimatorTest { val planner = mock(classOf[AbstractRelOptPlanner]) val catalogManager = mock(classOf[CatalogManager]) val moduleManager = mock(classOf[ModuleManager]) - val functionCatalog = new FunctionCatalog(catalogManager, moduleManager) + val config = new TableConfig + val functionCatalog = new FunctionCatalog(config, catalogManager, moduleManager) val context = new FlinkContextImpl(new TableConfig, functionCatalog, catalogManager) when(tableScan, "getCluster").thenReturn(cluster) when(cluster, "getRexBuilder").thenReturn(rexBuilder) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index aa3d48f..18cd61a 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -91,7 +91,7 @@ class FlinkRelMdHandlerTestBase { val plannerContext: PlannerContext = new PlannerContext( tableConfig, - new FunctionCatalog(catalogManager, moduleManager), + new FunctionCatalog(tableConfig, catalogManager, moduleManager), catalogManager, CalciteSchema.from(rootSchema), util.Arrays.asList( diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala index f89332b..6de6fa5 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala @@ -88,7 +88,7 @@ class SelectivityEstimatorTest { val planner = mock(classOf[AbstractRelOptPlanner]) val catalogManager = mock(classOf[CatalogManager]) val moduleManager = mock(classOf[ModuleManager]) - val functionCatalog = new FunctionCatalog(catalogManager, moduleManager) + val functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager) val context: FlinkContext = new FlinkContextImpl(tableConfig, functionCatalog, catalogManager) when(tableScan, "getCluster").thenReturn(cluster) when(cluster, "getRexBuilder").thenReturn(rexBuilder) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala index 35942d8..62c8e9f 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.utils import org.apache.flink.api.common.typeinfo.Types -import org.apache.flink.table.api.DataTypes +import org.apache.flink.table.api.{DataTypes, TableConfig} import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog} import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{unresolvedCall, unresolvedRef, valueLiteral} import org.apache.flink.table.expressions.{Expression, ExpressionParser} @@ -56,7 +56,10 @@ class RexNodeExtractorTest extends RexNodeTestBase { val catalogManager = new CatalogManager( defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database")) val moduleManager = new ModuleManager - private val functionCatalog = new FunctionCatalog(catalogManager, moduleManager) + private val functionCatalog = new FunctionCatalog( + TableConfig.getDefault, + catalogManager, + moduleManager) private val expressionBridge: ExpressionBridge[PlannerExpression] = new ExpressionBridge[PlannerExpression]( diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala index 573274c..278a2b0 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala @@ -19,14 +19,14 @@ package org.apache.flink.table.planner.runtime.stream.table import org.apache.flink.api.scala._ +import org.apache.flink.table.api.ValidationException import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.ScalarFunction -import org.apache.flink.table.planner.expressions.utils.{Func1, Func13, Func23, Func24, Func25, RichFunc1, RichFunc2} +import org.apache.flink.table.planner.expressions.utils._ import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode import org.apache.flink.table.planner.runtime.utils.TestData._ import org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink, TestingRetractSink, UserDefinedFunctionTestUtils} import org.apache.flink.types.Row - import org.junit.Assert._ import org.junit._ import org.junit.runner.RunWith @@ -294,6 +294,79 @@ class CalcITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode } @Test + def testInlineScalarFunction(): Unit = { + val t = env.fromElements(1, 2, 3, 4).toTable(tEnv).as('a) + + val sink = new TestingAppendSink + val result = t.select( + (new ScalarFunction() { + def eval(i: Int, suffix: String): String = { + suffix + i + } + })('a, ">>")) + result.toAppendStream[Row].addSink(sink) + env.execute() + + val expected = mutable.MutableList( + ">>1", + ">>2", + ">>3", + ">>4" + ) + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + + @Test + def testNonStaticObjectScalarFunction(): Unit = { + val t = env.fromElements(1, 2, 3, 4).toTable(tEnv).as('a) + + val sink = new TestingAppendSink + val result = t.select(NonStaticObjectScalarFunction('a, ">>")) + + result.toAppendStream[Row].addSink(sink) + env.execute() + + val expected = mutable.MutableList( + ">>1", + ">>2", + ">>3", + ">>4" + ) + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + + object NonStaticObjectScalarFunction extends ScalarFunction { + def eval(i: Int, suffix: String): String = { + suffix + i + } + } + + @Test(expected = classOf[ValidationException]) // see FLINK-15162 + def testNonStaticClassScalarFunction(): Unit = { + val t = env.fromElements(1, 2, 3, 4).toTable(tEnv).as('a) + + val sink = new TestingAppendSink + val result = t.select(new NonStaticClassScalarFunction()('a, ">>")) + + result.toAppendStream[Row].addSink(sink) + env.execute() + + val expected = mutable.MutableList( + ">>1", + ">>2", + ">>3", + ">>4" + ) + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + + class NonStaticClassScalarFunction extends ScalarFunction { + def eval(i: Int, suffix: String): String = { + suffix + i + } + } + + @Test def testMapType(): Unit = { val ds = env.fromCollection(tupleData3).toTable(tEnv).select(map('_1, '_3)) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index c65f5c7..16edbbf 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -1087,7 +1087,7 @@ object TestingTableEnvironment { settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName)) } val moduleManager = new ModuleManager - val functionCatalog = new FunctionCatalog(catalogMgr, moduleManager) + val functionCatalog = new FunctionCatalog(tableConfig, catalogMgr, moduleManager) val plannerProperties = settings.toPlannerProperties val executorProperties = settings.toExecutorProperties val executor = ComponentFactoryService.find(classOf[ExecutorFactory], diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 6b21ca9..82f5091 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -60,7 +60,7 @@ abstract class TableEnvImpl( // Table API/SQL function catalog private[flink] val functionCatalog: FunctionCatalog = - new FunctionCatalog(catalogManager, moduleManager) + new FunctionCatalog(config, catalogManager, moduleManager) // temporary utility until we don't use planner expressions anymore functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE) @@ -90,6 +90,7 @@ abstract class TableEnvImpl( } private[flink] val operationTreeBuilder = OperationTreeBuilder.create( + config, functionCatalog, tableLookup, isStreamingMode) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 3bbbc65..9925f0d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -1649,7 +1649,7 @@ abstract class CodeGenerator( function: UserDefinedFunction, contextTerm: String = null, functionContextClass: Class[_ <: FunctionContext] = classOf[FunctionContext]): String = { - val classQualifier = function.getClass.getCanonicalName + val classQualifier = function.getClass.getName val functionSerializedData = EncodingUtils.encodeObjectToString(function) val fieldTerm = s"function_${function.functionIdentifier}" diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala index b40053a..fc3d263 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala @@ -298,10 +298,6 @@ case class PlannerTableFunctionCall( override private[flink] def children: Seq[PlannerExpression] = parameters override def validateInput(): ValidationResult = { - // check if not Scala object - UserDefinedFunctionHelper.validateNotSingleton(tableFunction.getClass) - // check if class could be instantiated - UserDefinedFunctionHelper.validateInstantiation(tableFunction.getClass) // look for a signature that matches the input types val signature = parameters.map(_.resultType) val foundMethod = getUserDefinedMethod(tableFunction, "eval", typeInfoToClass(signature)) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala index 334fcb3..44b100f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala @@ -23,7 +23,7 @@ import java.util import org.apache.calcite.plan.RelOptRule.{none, operand} import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} import org.apache.calcite.rex.RexProgram -import org.apache.flink.table.api.TableException +import org.apache.flink.table.api.{TableConfig, TableException} import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog} import org.apache.flink.table.expressions.{Expression, PlannerExpression} import org.apache.flink.table.module.ModuleManager @@ -74,7 +74,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule( RexProgramExtractor.extractConjunctiveConditions( program, call.builder().getRexBuilder, - new FunctionCatalog(catalogManager, new ModuleManager)) + new FunctionCatalog(TableConfig.getDefault, catalogManager, new ModuleManager)) if (predicates.isEmpty) { // no condition can be translated to expression return diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java index f5d9231..7609994 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java @@ -86,7 +86,10 @@ public class SqlToOperationConverterTest { private final CatalogManager catalogManager = new CatalogManager("builtin", catalog); private final ModuleManager moduleManager = new ModuleManager(); - private final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager, moduleManager); + private final FunctionCatalog functionCatalog = new FunctionCatalog( + tableConfig, + catalogManager, + moduleManager); private final PlanningConfigurationBuilder planningConfigurationBuilder = new PlanningConfigurationBuilder(tableConfig, functionCatalog, diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala index a3450f4..b666039 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala @@ -208,7 +208,7 @@ class StreamTableEnvironmentTest extends TableTestBase { new GenericInMemoryCatalog("default_catalog", "default_database")) val moduleManager: ModuleManager = new ModuleManager val executor: StreamExecutor = new StreamExecutor(jStreamExecEnv) - val functionCatalog = new FunctionCatalog(manager, moduleManager) + val functionCatalog = new FunctionCatalog(config, manager, moduleManager) val streamPlanner = new StreamPlanner(executor, config, functionCatalog, manager) val jTEnv = new JStreamTableEnvironmentImpl( manager, diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala index 25f875f..95b57f8 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala @@ -68,15 +68,16 @@ class AggregateTest extends TableTestBase { @Test def testUserDefinedAggregateFunctionWithScalaAccumulator(): Unit = { val defaultCatalog = "default_catalog" + val config = new TableConfig val catalogManager = new CatalogManager( defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database")) val moduleManager = new ModuleManager - val functionCatalog = new FunctionCatalog(catalogManager, moduleManager) + val functionCatalog = new FunctionCatalog(config, catalogManager, moduleManager) val tablEnv = new StreamTableEnvironmentImpl( catalogManager, moduleManager, functionCatalog, - new TableConfig, + config, Mockito.mock(classOf[StreamExecutionEnvironment]), Mockito.mock(classOf[Planner]), Mockito.mock(classOf[Executor]), diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala index d109726..1f0da00 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala @@ -28,6 +28,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, INTEGER, VARCHAR} import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.util.{DateString, TimeString, TimestampString} +import org.apache.flink.table.api.TableConfig import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog} import org.apache.flink.table.expressions._ import org.apache.flink.table.module.ModuleManager @@ -43,6 +44,7 @@ import scala.collection.mutable class RexProgramExtractorTest extends RexProgramTestBase { private val functionCatalog: FunctionCatalog = new FunctionCatalog( + TableConfig.getDefault, new CatalogManager("default_catalog", new GenericInMemoryCatalog("default_catalog")), new ModuleManager ) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala index 521b139..912f23b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala @@ -20,8 +20,10 @@ package org.apache.flink.table.runtime.stream.table import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.ValidationException import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.utils._ +import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, UserDefinedFunctionTestUtils} import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.types.Row @@ -328,6 +330,92 @@ class CalcITCase extends AbstractTestBase { } @Test + def testInlineScalarFunction(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = StreamTableEnvironment.create(env) + + StreamITCase.testResults = mutable.MutableList() + + val t = env.fromElements(1, 2, 3, 4).toTable(tEnv).as('a) + + val result = t.select( + (new ScalarFunction() { + def eval(i: Int, suffix: String): String = { + suffix + i + } + })('a, ">>")) + + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = mutable.MutableList( + ">>1", + ">>2", + ">>3", + ">>4" + ) + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testNonStaticObjectScalarFunction(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = StreamTableEnvironment.create(env) + + StreamITCase.testResults = mutable.MutableList() + + val t = env.fromElements(1, 2, 3, 4).toTable(tEnv).as('a) + + val result = t.select(NonStaticObjectScalarFunction('a, ">>")) + + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = mutable.MutableList( + ">>1", + ">>2", + ">>3", + ">>4" + ) + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + object NonStaticObjectScalarFunction extends ScalarFunction { + def eval(i: Int, suffix: String): String = { + suffix + i + } + } + + @Test(expected = classOf[ValidationException]) // see FLINK-15162 + def testNonStaticClassScalarFunction(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = StreamTableEnvironment.create(env) + + StreamITCase.testResults = mutable.MutableList() + + val t = env.fromElements(1, 2, 3, 4).toTable(tEnv).as('a) + + val result = t.select(new NonStaticClassScalarFunction()('a, ">>")) + + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = mutable.MutableList( + ">>1", + ">>2", + ">>3", + ">>4" + ) + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + class NonStaticClassScalarFunction extends ScalarFunction { + def eval(i: Int, suffix: String): String = { + suffix + i + } + } + + @Test def testMapType(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala index 5ebc79e..abe6f80 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala @@ -332,7 +332,7 @@ case class StreamTableTestUtil( private val manager: CatalogManager = catalogManager.getOrElse(createCatalogManager()) private val moduleManager: ModuleManager = new ModuleManager private val executor: StreamExecutor = new StreamExecutor(javaEnv) - private val functionCatalog = new FunctionCatalog(manager, moduleManager) + private val functionCatalog = new FunctionCatalog(tableConfig, manager, moduleManager) private val streamPlanner = new StreamPlanner(executor, tableConfig, functionCatalog, manager) val javaTableEnv = new JavaStreamTableEnvironmentImpl(
