This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d71c4ee  [FLINK-13024][table] integrate FunctionCatalog with 
CatalogManager
d71c4ee is described below

commit d71c4eed25cbd7d183cafb29e8bfbb17df904cbd
Author: bowen.li <bowenl...@gmail.com>
AuthorDate: Fri Jul 5 15:37:37 2019 -0700

    [FLINK-13024][table] integrate FunctionCatalog with CatalogManager
    
    This PR integrates FunctionCatalog with Catalog APIs.
    
    This closes #8920.
---
 .../batch/connectors/hive/HiveTableFactory.java    | 106 ++++++++++++++++++-
 .../java/internal/StreamTableEnvironmentImpl.java  |   5 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   5 +-
 .../apache/flink/table/catalog/CatalogManager.java |  31 ++++--
 .../flink/table/catalog/FunctionCatalog.java       | 114 +++++++++++++++------
 .../internal/StreamTableEnvironmentImpl.scala      |   5 +-
 .../org/apache/flink/table/catalog/Catalog.java    |   4 +-
 .../table/factories/FunctionDefinitionFactory.java |  38 +++++++
 .../metadata/AggCallSelectivityEstimatorTest.scala |   7 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |  11 +-
 .../plan/metadata/SelectivityEstimatorTest.scala   |   7 +-
 .../table/plan/util/RexNodeExtractorTest.scala     |  17 +--
 .../apache/flink/table/util/TableTestBase.scala    |   3 +-
 .../flink/table/api/internal/TableEnvImpl.scala    |   5 +-
 .../PushFilterIntoTableSourceScanRule.scala        |   8 +-
 .../api/stream/StreamTableEnvironmentTest.scala    |   2 +-
 .../flink/table/api/stream/sql/AggregateTest.scala |  10 +-
 .../flink/table/plan/RexProgramExtractorTest.scala |   5 +-
 .../apache/flink/table/utils/TableTestBase.scala   |   3 +-
 19 files changed, 296 insertions(+), 90 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
index a22014a..9c8beaa 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
@@ -18,13 +18,28 @@
 
 package org.apache.flink.batch.connectors.hive;
 
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.factories.TableSinkFactory;
 import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.functions.AggregateFunctionDefinition;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.ScalarFunctionDefinition;
+import org.apache.flink.table.functions.TableFunctionDefinition;
+import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
+import org.apache.flink.table.functions.hive.HiveGenericUDAF;
+import org.apache.flink.table.functions.hive.HiveGenericUDF;
+import org.apache.flink.table.functions.hive.HiveGenericUDTF;
+import org.apache.flink.table.functions.hive.HiveSimpleUDF;
 import org.apache.flink.table.sinks.OutputFormatTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
@@ -33,7 +48,15 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.UDAF;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
 import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
@@ -41,13 +64,19 @@ import java.util.Map;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A table factory implementation for tables stored in Hive catalog.
+ * A table factory implementation for Hive catalog.
  */
-public class HiveTableFactory implements TableSourceFactory<Row>, 
TableSinkFactory<Row> {
-       private HiveConf hiveConf;
+public class HiveTableFactory
+               implements TableSourceFactory<Row>, TableSinkFactory<Row>, 
FunctionDefinitionFactory {
+       private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableFactory.class);
+
+       private final HiveConf hiveConf;
+       private final String hiveVersion;
 
        public HiveTableFactory(HiveConf hiveConf) {
                this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
+
+               this.hiveVersion = new 
JobConf(hiveConf).get(HiveCatalogValidator.CATALOG_HIVE_VERSION, 
HiveShimLoader.getHiveVersion());
        }
 
        @Override
@@ -112,4 +141,75 @@ public class HiveTableFactory implements 
TableSourceFactory<Row>, TableSinkFacto
                return new HiveTableSink(new JobConf(hiveConf), tablePath, 
table);
        }
 
+       @Override
+       public FunctionDefinition createFunctionDefinition(String name, 
CatalogFunction catalogFunction) {
+               String functionClassName = catalogFunction.getClassName();
+
+               if 
(Boolean.valueOf(catalogFunction.getProperties().get(CatalogConfig.IS_GENERIC)))
 {
+                       throw new TableException(
+                               String.format("HiveFunctionDefinitionFactory 
does not support generic functions %s yet", name));
+               }
+
+               Class clazz;
+               try {
+                       clazz = 
Thread.currentThread().getContextClassLoader().loadClass(functionClassName);
+
+                       LOG.info("Successfully loaded Hive udf '{}' with class 
'{}'", name, functionClassName);
+               } catch (ClassNotFoundException e) {
+                       throw new TableException(
+                               String.format("Failed to initiate an instance 
of class %s.", functionClassName), e);
+               }
+
+               if (UDF.class.isAssignableFrom(clazz)) {
+                       LOG.info("Transforming Hive function '{}' into a 
HiveSimpleUDF", name);
+
+                       return new ScalarFunctionDefinition(
+                               name,
+                               new HiveSimpleUDF(new 
HiveFunctionWrapper<>(functionClassName))
+                       );
+               } else if (GenericUDF.class.isAssignableFrom(clazz)) {
+                       LOG.info("Transforming Hive function '{}' into a 
HiveGenericUDF", name);
+
+                       return new ScalarFunctionDefinition(
+                               name,
+                               new HiveGenericUDF(new 
HiveFunctionWrapper<>(functionClassName))
+                       );
+               } else if (GenericUDTF.class.isAssignableFrom(clazz)) {
+                       LOG.info("Transforming Hive function '{}' into a 
HiveGenericUDTF", name);
+
+                       HiveGenericUDTF udtf = new HiveGenericUDTF(new 
HiveFunctionWrapper<>(functionClassName));
+
+                       return new TableFunctionDefinition(
+                               name,
+                               udtf,
+                               GenericTypeInfo.of(Row.class)
+                       );
+               } else if (GenericUDAFResolver2.class.isAssignableFrom(clazz) 
|| UDAF.class.isAssignableFrom(clazz)) {
+                       HiveGenericUDAF udaf;
+
+                       if (GenericUDAFResolver2.class.isAssignableFrom(clazz)) 
{
+                               LOG.info(
+                                       "Transforming Hive function '{}' into a 
HiveGenericUDAF with no UDAF bridging and Hive version %s",
+                                       name, hiveVersion);
+
+                               udaf = new HiveGenericUDAF(new 
HiveFunctionWrapper<>(functionClassName), false, hiveVersion);
+                       } else {
+                               LOG.info(
+                                       "Transforming Hive function '{}' into a 
HiveGenericUDAF with UDAF bridging and Hive version %s",
+                                       name, hiveVersion);
+
+                               udaf = new HiveGenericUDAF(new 
HiveFunctionWrapper<>(functionClassName), true, hiveVersion);
+                       }
+
+                       return new AggregateFunctionDefinition(
+                               name,
+                               udaf,
+                               GenericTypeInfo.of(Object.class),
+                               
GenericTypeInfo.of(GenericUDAFEvaluator.AggregationBuffer.class)
+                       );
+               } else {
+                       throw new IllegalArgumentException(
+                               String.format("HiveFunctionDefinitionFactory 
cannot initiate FunctionDefinition for class %s", functionClassName));
+               }
+       }
 }
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 86cd203..4d0586c 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -102,13 +102,12 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl imple
                        EnvironmentSettings settings,
                        TableConfig tableConfig) {
 
-               FunctionCatalog functionCatalog = new FunctionCatalog(
-                       settings.getBuiltInCatalogName(),
-                       settings.getBuiltInDatabaseName());
                CatalogManager catalogManager = new CatalogManager(
                        settings.getBuiltInCatalogName(),
                        new 
GenericInMemoryCatalog(settings.getBuiltInCatalogName(), 
settings.getBuiltInDatabaseName()));
 
+               FunctionCatalog functionCatalog = new 
FunctionCatalog(catalogManager);
+
                Map<String, String> executorProperties = 
settings.toExecutorProperties();
                Executor executor = lookupExecutor(executorProperties, 
executionEnvironment);
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 988fa88..f2aea62 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -123,13 +123,12 @@ public class TableEnvironmentImpl implements 
TableEnvironment {
 
        public static TableEnvironmentImpl create(EnvironmentSettings settings) 
{
 
-               FunctionCatalog functionCatalog = new FunctionCatalog(
-                       settings.getBuiltInCatalogName(),
-                       settings.getBuiltInDatabaseName());
                CatalogManager catalogManager = new CatalogManager(
                        settings.getBuiltInCatalogName(),
                        new 
GenericInMemoryCatalog(settings.getBuiltInCatalogName(), 
settings.getBuiltInDatabaseName()));
 
+               FunctionCatalog functionCatalog = new 
FunctionCatalog(catalogManager);
+
                Map<String, String> executorProperties = 
settings.toExecutorProperties();
                Executor executor = 
ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
                        .create(executorProperties);
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 0f83176..c5d0bc7 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -59,11 +59,14 @@ public class CatalogManager {
        // TO BE REMOVED along with ExternalCatalog API
        private Map<String, ExternalCatalog>  externalCatalogs;
 
-       // The name of the default catalog and schema
+       // The name of the current catalog and database
        private String currentCatalogName;
 
        private String currentDatabaseName;
 
+       // The name of the default catalog
+       private final String defaultCatalogName;
+
        /**
         * Temporary solution to handle both {@link CatalogBaseTable} and
         * {@link ExternalCatalogTable} in a single call.
@@ -125,6 +128,7 @@ public class CatalogManager {
                catalogs.put(defaultCatalogName, defaultCatalog);
                this.currentCatalogName = defaultCatalogName;
                this.currentDatabaseName = defaultCatalog.getDefaultDatabase();
+               this.defaultCatalogName = defaultCatalogName;
        }
 
        /**
@@ -215,9 +219,9 @@ public class CatalogManager {
        }
 
        /**
-        * Gets the current default catalog that will be used when resolving 
table path.
+        * Gets the current catalog that will be used when resolving table path.
         *
-        * @return the current default catalog
+        * @return the current catalog
         * @see CatalogManager#resolveTable(String...)
         */
        public String getCurrentCatalog() {
@@ -225,9 +229,9 @@ public class CatalogManager {
        }
 
        /**
-        * Sets the current default catalog name that will be used when 
resolving table path.
+        * Sets the current catalog name that will be used when resolving table 
path.
         *
-        * @param catalogName catalog name to set as current default catalog
+        * @param catalogName catalog name to set as current catalog
         * @throws CatalogNotExistException thrown if the catalog doesn't exist
         * @see CatalogManager#resolveTable(String...)
         */
@@ -255,9 +259,9 @@ public class CatalogManager {
        }
 
        /**
-        * Gets the current default database name that will be used when 
resolving table path.
+        * Gets the current database name that will be used when resolving 
table path.
         *
-        * @return the current default database
+        * @return the current database
         * @see CatalogManager#resolveTable(String...)
         */
        public String getCurrentDatabase() {
@@ -265,10 +269,10 @@ public class CatalogManager {
        }
 
        /**
-        * Sets the current default database name that will be used when 
resolving a table path.
+        * Sets the current database name that will be used when resolving a 
table path.
         * The database has to exist in the current catalog.
         *
-        * @param databaseName database name to set as current default database 
name
+        * @param databaseName database name to set as current database name
         * @throws CatalogException thrown if the database doesn't exist in the 
current catalog
         * @see CatalogManager#resolveTable(String...)
         * @see CatalogManager#setCurrentCatalog(String)
@@ -294,6 +298,15 @@ public class CatalogManager {
        }
 
        /**
+        * Gets the default catalog name.
+        *
+        * @return the default catalog
+        */
+       public String getDefaultCatalogName() {
+               return defaultCatalogName;
+       }
+
+       /**
         * Tries to resolve a table path to a {@link ResolvedTable}. The 
algorithm looks for requested table
         * in the following paths in that order:
         * <ol>
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 0d18ffe..af7f21f 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -21,7 +21,10 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
@@ -36,21 +39,26 @@ import 
org.apache.flink.table.functions.UserDefinedAggregateFunction;
 import org.apache.flink.table.functions.UserFunctionsTypeHelper;
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Simple function catalog to store {@link FunctionDefinition}s in memory.
+ * Simple function catalog to store {@link FunctionDefinition}s in catalogs.
  */
 @Internal
 public class FunctionCatalog implements FunctionLookup {
 
-       private final String defaultCatalogName;
-
-       private final String defaultDatabaseName;
+       private final CatalogManager catalogManager;
 
+       // For simplicity, currently hold registered Flink functions in memory 
here
+       // TODO: should move to catalog
        private final Map<String, FunctionDefinition> userFunctions = new 
LinkedHashMap<>();
 
        /**
@@ -58,11 +66,8 @@ public class FunctionCatalog implements FunctionLookup {
         */
        private PlannerTypeInferenceUtil plannerTypeInferenceUtil;
 
-       public FunctionCatalog(
-                       String defaultCatalogName,
-                       String defaultDatabaseName) {
-               this.defaultCatalogName = defaultCatalogName;
-               this.defaultDatabaseName = defaultDatabaseName;
+       public FunctionCatalog(CatalogManager catalogManager) {
+               this.catalogManager = checkNotNull(catalogManager);
        }
 
        public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil 
plannerTypeInferenceUtil) {
@@ -129,35 +134,81 @@ public class FunctionCatalog implements FunctionLookup {
        }
 
        public String[] getUserDefinedFunctions() {
-               return userFunctions.values().stream()
-                       .map(FunctionDefinition::toString)
-                       .toArray(String[]::new);
+               List<String> result = new ArrayList<>();
+
+               // Get functions in catalog
+               Catalog catalog = 
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
+               try {
+                       
result.addAll(catalog.listFunctions(catalogManager.getCurrentDatabase()));
+               } catch (DatabaseNotExistException e) {
+                       // Ignore since there will always be a current database 
of the current catalog
+               }
+
+               // Get functions registered in memory
+               result.addAll(
+                       userFunctions.values().stream()
+                               .map(FunctionDefinition::toString)
+                               .collect(Collectors.toList()));
+
+               return result.stream()
+                       .collect(Collectors.toList())
+                       .toArray(new String[0]);
        }
 
        @Override
        public Optional<FunctionLookup.Result> lookupFunction(String name) {
-               final FunctionDefinition userCandidate = 
userFunctions.get(normalizeName(name));
-               final Optional<FunctionDefinition> foundDefinition;
-               if (userCandidate != null) {
-                       foundDefinition = Optional.of(userCandidate);
-               } else {
+               String functionName = normalizeName(name);
 
-                       // TODO once we connect this class with the Catalog 
APIs we need to make sure that
-                       //  built-in functions are present in "root" built-in 
catalog. This allows to
-                       //  overwrite built-in functions but also fallback to 
the "root" catalog. It should be
-                       //  possible to disable the "root" catalog if that is 
desired.
+               FunctionDefinition userCandidate = null;
 
-                       foundDefinition = 
BuiltInFunctionDefinitions.getDefinitions()
-                               .stream()
-                               .filter(f -> 
normalizeName(name).equals(normalizeName(f.getName())))
-                               .findFirst()
-                               .map(Function.identity());
-               }
+               Catalog catalog = 
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
 
-               return foundDefinition.map(definition -> new 
FunctionLookup.Result(
-                       ObjectIdentifier.of(defaultCatalogName, 
defaultDatabaseName, name),
-                       definition)
-               );
+               if (catalog.getTableFactory().isPresent() &&
+                               catalog.getTableFactory().get() instanceof 
FunctionDefinitionFactory) {
+                       try {
+                               CatalogFunction catalogFunction = 
catalog.getFunction(
+                                       new 
ObjectPath(catalogManager.getCurrentDatabase(), functionName));
+
+                               FunctionDefinitionFactory factory = 
(FunctionDefinitionFactory) catalog.getTableFactory().get();
+
+                               userCandidate = 
factory.createFunctionDefinition(functionName, catalogFunction);
+                       } catch (FunctionNotExistException e) {
+                               // Ignore
+                       }
+
+                       return Optional.of(
+                                       new FunctionLookup.Result(
+                                               
ObjectIdentifier.of(catalogManager.getCurrentCatalog(), 
catalogManager.getCurrentDatabase(), name),
+                                               userCandidate)
+                               );
+               } else {
+                       // Else, check in-memory functions
+                       userCandidate = userFunctions.get(functionName);
+
+                       final Optional<FunctionDefinition> foundDefinition;
+                       if (userCandidate != null) {
+                               foundDefinition = Optional.of(userCandidate);
+                       } else {
+
+                               // TODO once we connect this class with the 
Catalog APIs we need to make sure that
+                               //  built-in functions are present in "root" 
built-in catalog. This allows to
+                               //  overwrite built-in functions but also 
fallback to the "root" catalog. It should be
+                               //  possible to disable the "root" catalog if 
that is desired.
+
+                               foundDefinition = 
BuiltInFunctionDefinitions.getDefinitions()
+                                       .stream()
+                                       .filter(f -> 
functionName.equals(normalizeName(f.getName())))
+                                       .findFirst()
+                                       .map(Function.identity());
+                       }
+
+                       String defaultCatalogName = 
catalogManager.getDefaultCatalogName();
+
+                       return foundDefinition.map(definition -> new 
FunctionLookup.Result(
+                               ObjectIdentifier.of(defaultCatalogName, 
catalogManager.getCatalog(defaultCatalogName).get().getDefaultDatabase(), name),
+                               definition)
+                       );
+               }
        }
 
        @Override
@@ -169,6 +220,7 @@ public class FunctionCatalog implements FunctionLookup {
        }
 
        private void registerFunction(String name, FunctionDefinition 
functionDefinition) {
+               // TODO: should register to catalog
                userFunctions.put(normalizeName(name), functionDefinition);
        }
 
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index c2b9831..1304c8c 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -275,13 +275,12 @@ object StreamTableEnvironmentImpl {
       tableConfig: TableConfig)
     : StreamTableEnvironmentImpl = {
 
-    val functionCatalog = new FunctionCatalog(
-      settings.getBuiltInCatalogName,
-      settings.getBuiltInDatabaseName)
     val catalogManager = new CatalogManager(
       settings.getBuiltInCatalogName,
       new GenericInMemoryCatalog(settings.getBuiltInCatalogName, 
settings.getBuiltInDatabaseName))
 
+    val functionCatalog = new FunctionCatalog(catalogManager)
+
     val executorProperties = settings.toExecutorProperties
     val executor = lookupExecutor(executorProperties, executionEnvironment)
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index dbb5c1b..8019ab0 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -47,8 +47,8 @@ import java.util.Optional;
 public interface Catalog {
 
        /**
-        * Get an optional {@link TableFactory} instance that's responsible for 
generating source/sink for tables
-        * stored in this catalog.
+        * Get an optional {@link TableFactory} instance that's responsible for 
generating table-related
+        * instances stored in this catalog, instances such as source/sink and 
function definitions.
         *
         * @return an optional TableFactory instance
         */
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
new file mode 100644
index 0000000..2e8c538
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.functions.FunctionDefinition;
+
+/**
+ * A factory to create {@link FunctionDefinition}.
+ * See also {@link TableFactory} for more information.
+ */
+public interface FunctionDefinitionFactory extends TableFactory {
+
+       /**
+        * Creates a {@link FunctionDefinition} from given {@link 
CatalogFunction}.
+        *
+        * @param name name of the {@link CatalogFunction}
+        * @param catalogFunction the catalog function
+        * @return a {@link FunctionDefinition}
+        */
+       FunctionDefinition createFunctionDefinition(String name, 
CatalogFunction catalogFunction);
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/AggCallSelectivityEstimatorTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/AggCallSelectivityEstimatorTest.scala
index ea14fa9..b52d9e3 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/AggCallSelectivityEstimatorTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/AggCallSelectivityEstimatorTest.scala
@@ -20,12 +20,11 @@ package org.apache.flink.table.plan.metadata
 
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.{FlinkContextImpl, FlinkTypeFactory, 
FlinkTypeSystem}
-import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
 import org.apache.flink.table.plan.schema._
 import org.apache.flink.table.plan.stats.{ColumnStats, FlinkStatistic, 
TableStats}
 import org.apache.flink.table.{JDouble, JLong}
 import org.apache.flink.util.Preconditions
-
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster}
 import org.apache.calcite.rel.`type`.RelDataType
@@ -45,7 +44,6 @@ import org.junit.{Before, BeforeClass, Test}
 import org.powermock.api.mockito.PowerMockito._
 import org.powermock.core.classloader.annotations.PrepareForTest
 import org.powermock.modules.junit4.PowerMockRunner
-
 import java.math.BigDecimal
 
 import scala.collection.JavaConversions._
@@ -81,7 +79,8 @@ class AggCallSelectivityEstimatorTest {
     val tableScan = mock(classOf[TableScan])
     val cluster = mock(classOf[RelOptCluster])
     val planner = mock(classOf[AbstractRelOptPlanner])
-    val functionCatalog = new FunctionCatalog("default_catalog", 
"default_database")
+    val catalogManager = mock(classOf[CatalogManager])
+    val functionCatalog = new FunctionCatalog(catalogManager)
     val context = new FlinkContextImpl(new TableConfig, functionCatalog)
     when(tableScan, "getCluster").thenReturn(cluster)
     when(cluster, "getRexBuilder").thenReturn(rexBuilder)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 8d7c348..c1627a6 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.metadata
 import org.apache.flink.table.api.{TableConfig, TableException}
 import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, 
GenericInMemoryCatalog}
 import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.intervalOfMillis
 import org.apache.flink.table.expressions._
 import 
org.apache.flink.table.functions.aggfunctions.SumAggFunction.DoubleSumAggFunction
@@ -44,7 +44,6 @@ import 
org.apache.flink.table.runtime.rank.{ConstantRankRange, RankType, Variabl
 import org.apache.flink.table.types.AtomicDataType
 import org.apache.flink.table.types.logical.{BigIntType, DoubleType, IntType, 
LogicalType, TimestampKind, TimestampType, VarCharType}
 import org.apache.flink.table.util.CountAggFunction
-
 import com.google.common.collect.{ImmutableList, Lists}
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan._
@@ -64,7 +63,6 @@ import org.apache.calcite.sql.fun.{SqlCountAggFunction, 
SqlStdOperatorTable}
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.util.{DateString, ImmutableBitSet, ImmutableIntList, 
TimeString, TimestampString}
 import org.junit.{Before, BeforeClass}
-
 import java.math.BigDecimal
 import java.util
 
@@ -74,12 +72,17 @@ class FlinkRelMdHandlerTestBase {
 
   val tableConfig = new TableConfig()
   val rootSchema: SchemaPlus = MetadataTestUtil.initRootSchema()
+
+  val defaultCatalog = "default_catalog"
+  val catalogManager = new CatalogManager(
+    defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, 
"default_database"))
+
   // TODO batch RelNode and stream RelNode should have different PlannerContext
   //  and RelOptCluster due to they have different trait definitions.
   val plannerContext: PlannerContext =
     new PlannerContext(
       tableConfig,
-      new FunctionCatalog("default_catalog", "default_database"),
+      new FunctionCatalog(catalogManager),
       CalciteSchema.from(rootSchema),
       util.Arrays.asList(
         ConventionTraitDef.INSTANCE,
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/SelectivityEstimatorTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/SelectivityEstimatorTest.scala
index 8494ae1..2259d94 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/SelectivityEstimatorTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/SelectivityEstimatorTest.scala
@@ -20,12 +20,11 @@ package org.apache.flink.table.plan.metadata
 
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.{FlinkContext, FlinkContextImpl, 
FlinkTypeFactory, FlinkTypeSystem}
-import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
 import org.apache.flink.table.plan.schema._
 import org.apache.flink.table.plan.stats.{ColumnStats, FlinkStatistic, 
TableStats}
 import org.apache.flink.table.{JDouble, JLong}
 import org.apache.flink.util.Preconditions
-
 import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
@@ -42,7 +41,6 @@ import org.junit.{Before, BeforeClass, Test}
 import org.powermock.api.mockito.PowerMockito._
 import org.powermock.core.classloader.annotations.PrepareForTest
 import org.powermock.modules.junit4.PowerMockRunner
-
 import java.math.BigDecimal
 
 import scala.collection.JavaConverters._
@@ -83,7 +81,8 @@ class SelectivityEstimatorTest {
     val tableScan = mock(classOf[TableScan])
     val cluster = mock(classOf[RelOptCluster])
     val planner = mock(classOf[AbstractRelOptPlanner])
-    val functionCatalog = new FunctionCatalog("default_catalog", 
"default_database")
+    val catalogManager = mock(classOf[CatalogManager])
+    val functionCatalog = new FunctionCatalog(catalogManager)
     val context: FlinkContext = new FlinkContextImpl(tableConfig, 
functionCatalog)
     when(tableScan, "getCluster").thenReturn(cluster)
     when(cluster, "getRexBuilder").thenReturn(rexBuilder)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
index c963b1f..f8af378 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
@@ -18,9 +18,6 @@
 
 package org.apache.flink.table.plan.util
 
-import org.apache.flink.table.api.{DataTypes, Types}
-import org.apache.flink.table.catalog.FunctionCatalog
-import org.apache.flink.table.expressions._
 import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.{unresolvedCall, 
unresolvedRef, valueLiteral}
 import org.apache.flink.table.expressions.utils.Func1
 import org.apache.flink.table.functions.AggregateFunctionDefinition
@@ -29,7 +26,6 @@ import 
org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.functions.utils.ScalarSqlFunction
 import org.apache.flink.table.plan.util.InputTypeBuilder.inputOf
 import org.apache.flink.table.util.{DateTimeTestUtil, IntSumAggFunction}
-
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.SqlPostfixOperator
@@ -40,21 +36,26 @@ import org.apache.calcite.util.{DateString, TimeString, 
TimestampString}
 import org.hamcrest.CoreMatchers.is
 import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat, 
assertTrue}
 import org.junit.Test
-
 import java.math.BigDecimal
 import java.sql.Timestamp
-import java.util.{List => JList}
-import java.sql.{Date, Time, Timestamp}
 import java.util.{TimeZone, List => JList}
 
+import org.apache.flink.api.common.typeinfo.Types
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, 
GenericInMemoryCatalog}
+import org.apache.flink.table.expressions.{EqualTo, Expression, 
ExpressionBridge, ExpressionParser, GreaterThan, Literal, PlannerExpression, 
PlannerExpressionConverter, Sum, UnresolvedFieldReference}
+
 import scala.collection.JavaConverters._
 
 /**
   * Test for [[RexNodeExtractor]].
   */
 class RexNodeExtractorTest extends RexNodeTestBase {
+  val defaultCatalog = "default_catalog"
+  val catalogManager = new CatalogManager(
+    defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, 
"default_database"))
 
-  private val functionCatalog = new FunctionCatalog("default_catalog", 
"default_database")
+  private val functionCatalog = new FunctionCatalog(catalogManager)
 
   private val expressionBridge: ExpressionBridge[PlannerExpression] =
     new ExpressionBridge[PlannerExpression](
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index 26a5268..38b5d12 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -985,8 +985,6 @@ object TestingTableEnvironment {
   def create(
       settings: EnvironmentSettings,
       catalogManager: Option[CatalogManager] = None): TestingTableEnvironment 
= {
-    val functionCatalog = new FunctionCatalog(
-      settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName)
     val catalogMgr = catalogManager match {
       case Some(c) => c
       case _ =>
@@ -994,6 +992,7 @@ object TestingTableEnvironment {
           new GenericInMemoryCatalog(
             settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
     }
+    val functionCatalog = new FunctionCatalog(catalogMgr)
     val plannerProperties = settings.toPlannerProperties
     val executorProperties = settings.toExecutorProperties
     val executor = ComponentFactoryService.find(classOf[ExecutorFactory],
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 13d6c53..3452af6 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -64,9 +64,8 @@ abstract class TableEnvImpl(
   protected val builtinDatabaseName: String = catalogManager.getCurrentDatabase
 
   // Table API/SQL function catalog
-  private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog(
-    builtinCatalogName,
-    builtinDatabaseName)
+  private[flink] val functionCatalog: FunctionCatalog = new 
FunctionCatalog(catalogManager)
+
   // temporary utility until we don't use planner expressions anymore
   
functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
index 0dc9805..9c16135 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
@@ -23,7 +23,7 @@ import java.util
 import org.apache.calcite.plan.RelOptRule.{none, operand}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rex.RexProgram
-import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, 
GenericInMemoryCatalog}
 import org.apache.flink.table.expressions.{Expression, PlannerExpression}
 import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalTableSourceScan}
 import org.apache.flink.table.plan.util.RexProgramExtractor
@@ -37,6 +37,10 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
     operand(classOf[FlinkLogicalTableSourceScan], none)),
   "PushFilterIntoTableSourceScanRule") {
 
+  private val defaultCatalog = "default_catalog"
+  private val catalogManager = new CatalogManager(
+    defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, 
"default_database"))
+
   override def matches(call: RelOptRuleCall): Boolean = {
     val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
     val scan: FlinkLogicalTableSourceScan = 
call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
@@ -68,7 +72,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
       RexProgramExtractor.extractConjunctiveConditions(
         program,
         call.builder().getRexBuilder,
-        new FunctionCatalog("default_catalog", "default_database"))
+        new FunctionCatalog(catalogManager))
     if (predicates.isEmpty) {
       // no condition can be translated to expression
       return
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 20e8ba6..11e43cf 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -207,7 +207,7 @@ class StreamTableEnvironmentTest extends TableTestBase {
       "default_catalog",
       new GenericInMemoryCatalog("default_catalog", "default_database"))
     val executor: StreamExecutor = new StreamExecutor(jStreamExecEnv)
-    val functionCatalog = new FunctionCatalog(manager.getCurrentCatalog, 
manager.getCurrentDatabase)
+    val functionCatalog = new FunctionCatalog(manager)
     val streamPlanner = new StreamPlanner(executor, config, functionCatalog, 
manager)
     val jTEnv = new JStreamTableEnvironmentImpl(
       manager,
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
index b56919d..c917267 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
@@ -26,7 +26,7 @@ import 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
 import org.apache.flink.table.api.{TableConfig, Types}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, 
GenericInMemoryCatalog}
 import org.apache.flink.table.delegation.{Executor, Planner}
 import org.apache.flink.table.functions.{AggregateFunction, 
AggregateFunctionDefinition}
 import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, 
unaryNode}
@@ -66,9 +66,13 @@ class AggregateTest extends TableTestBase {
 
   @Test
   def testUserDefinedAggregateFunctionWithScalaAccumulator(): Unit = {
-    val functionCatalog = new FunctionCatalog("cat", "db")
+    val defaultCatalog = "default_catalog"
+    val catalogManager = new CatalogManager(
+      defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, 
"default_database"))
+
+    val functionCatalog = new FunctionCatalog(catalogManager)
     val tablEnv = new StreamTableEnvironmentImpl(
-      Mockito.mock(classOf[CatalogManager]),
+      catalogManager,
       functionCatalog,
       new TableConfig,
       Mockito.mock(classOf[StreamExecutionEnvironment]),
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
index fde9743..b752b76 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
@@ -28,7 +28,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, INTEGER, VARCHAR}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.util.{DateString, TimeString, TimestampString}
-import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, 
GenericInMemoryCatalog}
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.util.{RexNodeToExpressionConverter, 
RexProgramExtractor}
 import org.apache.flink.table.utils.InputTypeBuilder.inputOf
@@ -42,8 +42,7 @@ import scala.collection.mutable
 class RexProgramExtractorTest extends RexProgramTestBase {
 
   private val functionCatalog: FunctionCatalog = new FunctionCatalog(
-    "default_catalog",
-    "default_database")
+    new CatalogManager("default_catalog", new 
GenericInMemoryCatalog("default_catalog")))
   private val expressionBridge: ExpressionBridge[PlannerExpression] =
     new ExpressionBridge[PlannerExpression](
       functionCatalog,
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index e5810e0..6e0c3a3 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -329,8 +329,7 @@ case class StreamTableTestUtil(
   private val tableConfig = new TableConfig
   private val manager: CatalogManager = 
catalogManager.getOrElse(createCatalogManager())
   private val executor: StreamExecutor = new StreamExecutor(javaEnv)
-  private val functionCatalog =
-    new FunctionCatalog(manager.getCurrentCatalog, manager.getCurrentDatabase)
+  private val functionCatalog = new FunctionCatalog(manager)
   private val streamPlanner = new StreamPlanner(executor, tableConfig, 
functionCatalog, manager)
 
   val javaTableEnv = new JavaStreamTableEnvironmentImpl(

Reply via email to