This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d71c4ee [FLINK-13024][table] integrate FunctionCatalog with CatalogManager d71c4ee is described below commit d71c4eed25cbd7d183cafb29e8bfbb17df904cbd Author: bowen.li <bowenl...@gmail.com> AuthorDate: Fri Jul 5 15:37:37 2019 -0700 [FLINK-13024][table] integrate FunctionCatalog with CatalogManager This PR integrates FunctionCatalog with Catalog APIs. This closes #8920. --- .../batch/connectors/hive/HiveTableFactory.java | 106 ++++++++++++++++++- .../java/internal/StreamTableEnvironmentImpl.java | 5 +- .../table/api/internal/TableEnvironmentImpl.java | 5 +- .../apache/flink/table/catalog/CatalogManager.java | 31 ++++-- .../flink/table/catalog/FunctionCatalog.java | 114 +++++++++++++++------ .../internal/StreamTableEnvironmentImpl.scala | 5 +- .../org/apache/flink/table/catalog/Catalog.java | 4 +- .../table/factories/FunctionDefinitionFactory.java | 38 +++++++ .../metadata/AggCallSelectivityEstimatorTest.scala | 7 +- .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 11 +- .../plan/metadata/SelectivityEstimatorTest.scala | 7 +- .../table/plan/util/RexNodeExtractorTest.scala | 17 +-- .../apache/flink/table/util/TableTestBase.scala | 3 +- .../flink/table/api/internal/TableEnvImpl.scala | 5 +- .../PushFilterIntoTableSourceScanRule.scala | 8 +- .../api/stream/StreamTableEnvironmentTest.scala | 2 +- .../flink/table/api/stream/sql/AggregateTest.scala | 10 +- .../flink/table/plan/RexProgramExtractorTest.scala | 5 +- .../apache/flink/table/utils/TableTestBase.scala | 3 +- 19 files changed, 296 insertions(+), 90 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java index a22014a..9c8beaa 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java @@ -18,13 +18,28 @@ package org.apache.flink.batch.connectors.hive; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.CatalogFunction; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.config.CatalogConfig; +import org.apache.flink.table.catalog.hive.client.HiveShimLoader; +import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; +import org.apache.flink.table.factories.FunctionDefinitionFactory; import org.apache.flink.table.factories.TableFactoryUtil; import org.apache.flink.table.factories.TableSinkFactory; import org.apache.flink.table.factories.TableSourceFactory; +import org.apache.flink.table.functions.AggregateFunctionDefinition; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.ScalarFunctionDefinition; +import org.apache.flink.table.functions.TableFunctionDefinition; +import org.apache.flink.table.functions.hive.HiveFunctionWrapper; +import org.apache.flink.table.functions.hive.HiveGenericUDAF; +import org.apache.flink.table.functions.hive.HiveGenericUDF; +import org.apache.flink.table.functions.hive.HiveGenericUDTF; +import org.apache.flink.table.functions.hive.HiveSimpleUDF; import org.apache.flink.table.sinks.OutputFormatTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.InputFormatTableSource; @@ -33,7 +48,15 @@ import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.UDAF; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; @@ -41,13 +64,19 @@ import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A table factory implementation for tables stored in Hive catalog. + * A table factory implementation for Hive catalog. */ -public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Row> { - private HiveConf hiveConf; +public class HiveTableFactory + implements TableSourceFactory<Row>, TableSinkFactory<Row>, FunctionDefinitionFactory { + private static final Logger LOG = LoggerFactory.getLogger(HiveTableFactory.class); + + private final HiveConf hiveConf; + private final String hiveVersion; public HiveTableFactory(HiveConf hiveConf) { this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); + + this.hiveVersion = new JobConf(hiveConf).get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion()); } @Override @@ -112,4 +141,75 @@ public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFacto return new HiveTableSink(new JobConf(hiveConf), tablePath, table); } + @Override + public FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction) { + String functionClassName = catalogFunction.getClassName(); + + if (Boolean.valueOf(catalogFunction.getProperties().get(CatalogConfig.IS_GENERIC))) { + throw new TableException( + String.format("HiveFunctionDefinitionFactory does not support generic functions %s yet", name)); + } + + Class clazz; + try { + clazz = Thread.currentThread().getContextClassLoader().loadClass(functionClassName); + + LOG.info("Successfully loaded Hive udf '{}' with class '{}'", name, functionClassName); + } catch (ClassNotFoundException e) { + throw new TableException( + String.format("Failed to initiate an instance of class %s.", functionClassName), e); + } + + if (UDF.class.isAssignableFrom(clazz)) { + LOG.info("Transforming Hive function '{}' into a HiveSimpleUDF", name); + + return new ScalarFunctionDefinition( + name, + new HiveSimpleUDF(new HiveFunctionWrapper<>(functionClassName)) + ); + } else if (GenericUDF.class.isAssignableFrom(clazz)) { + LOG.info("Transforming Hive function '{}' into a HiveGenericUDF", name); + + return new ScalarFunctionDefinition( + name, + new HiveGenericUDF(new HiveFunctionWrapper<>(functionClassName)) + ); + } else if (GenericUDTF.class.isAssignableFrom(clazz)) { + LOG.info("Transforming Hive function '{}' into a HiveGenericUDTF", name); + + HiveGenericUDTF udtf = new HiveGenericUDTF(new HiveFunctionWrapper<>(functionClassName)); + + return new TableFunctionDefinition( + name, + udtf, + GenericTypeInfo.of(Row.class) + ); + } else if (GenericUDAFResolver2.class.isAssignableFrom(clazz) || UDAF.class.isAssignableFrom(clazz)) { + HiveGenericUDAF udaf; + + if (GenericUDAFResolver2.class.isAssignableFrom(clazz)) { + LOG.info( + "Transforming Hive function '{}' into a HiveGenericUDAF with no UDAF bridging and Hive version %s", + name, hiveVersion); + + udaf = new HiveGenericUDAF(new HiveFunctionWrapper<>(functionClassName), false, hiveVersion); + } else { + LOG.info( + "Transforming Hive function '{}' into a HiveGenericUDAF with UDAF bridging and Hive version %s", + name, hiveVersion); + + udaf = new HiveGenericUDAF(new HiveFunctionWrapper<>(functionClassName), true, hiveVersion); + } + + return new AggregateFunctionDefinition( + name, + udaf, + GenericTypeInfo.of(Object.class), + GenericTypeInfo.of(GenericUDAFEvaluator.AggregationBuffer.class) + ); + } else { + throw new IllegalArgumentException( + String.format("HiveFunctionDefinitionFactory cannot initiate FunctionDefinition for class %s", functionClassName)); + } + } } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java index 86cd203..4d0586c 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java @@ -102,13 +102,12 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple EnvironmentSettings settings, TableConfig tableConfig) { - FunctionCatalog functionCatalog = new FunctionCatalog( - settings.getBuiltInCatalogName(), - settings.getBuiltInDatabaseName()); CatalogManager catalogManager = new CatalogManager( settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())); + FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager); + Map<String, String> executorProperties = settings.toExecutorProperties(); Executor executor = lookupExecutor(executorProperties, executionEnvironment); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 988fa88..f2aea62 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -123,13 +123,12 @@ public class TableEnvironmentImpl implements TableEnvironment { public static TableEnvironmentImpl create(EnvironmentSettings settings) { - FunctionCatalog functionCatalog = new FunctionCatalog( - settings.getBuiltInCatalogName(), - settings.getBuiltInDatabaseName()); CatalogManager catalogManager = new CatalogManager( settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())); + FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager); + Map<String, String> executorProperties = settings.toExecutorProperties(); Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties) .create(executorProperties); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 0f83176..c5d0bc7 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -59,11 +59,14 @@ public class CatalogManager { // TO BE REMOVED along with ExternalCatalog API private Map<String, ExternalCatalog> externalCatalogs; - // The name of the default catalog and schema + // The name of the current catalog and database private String currentCatalogName; private String currentDatabaseName; + // The name of the default catalog + private final String defaultCatalogName; + /** * Temporary solution to handle both {@link CatalogBaseTable} and * {@link ExternalCatalogTable} in a single call. @@ -125,6 +128,7 @@ public class CatalogManager { catalogs.put(defaultCatalogName, defaultCatalog); this.currentCatalogName = defaultCatalogName; this.currentDatabaseName = defaultCatalog.getDefaultDatabase(); + this.defaultCatalogName = defaultCatalogName; } /** @@ -215,9 +219,9 @@ public class CatalogManager { } /** - * Gets the current default catalog that will be used when resolving table path. + * Gets the current catalog that will be used when resolving table path. * - * @return the current default catalog + * @return the current catalog * @see CatalogManager#resolveTable(String...) */ public String getCurrentCatalog() { @@ -225,9 +229,9 @@ public class CatalogManager { } /** - * Sets the current default catalog name that will be used when resolving table path. + * Sets the current catalog name that will be used when resolving table path. * - * @param catalogName catalog name to set as current default catalog + * @param catalogName catalog name to set as current catalog * @throws CatalogNotExistException thrown if the catalog doesn't exist * @see CatalogManager#resolveTable(String...) */ @@ -255,9 +259,9 @@ public class CatalogManager { } /** - * Gets the current default database name that will be used when resolving table path. + * Gets the current database name that will be used when resolving table path. * - * @return the current default database + * @return the current database * @see CatalogManager#resolveTable(String...) */ public String getCurrentDatabase() { @@ -265,10 +269,10 @@ public class CatalogManager { } /** - * Sets the current default database name that will be used when resolving a table path. + * Sets the current database name that will be used when resolving a table path. * The database has to exist in the current catalog. * - * @param databaseName database name to set as current default database name + * @param databaseName database name to set as current database name * @throws CatalogException thrown if the database doesn't exist in the current catalog * @see CatalogManager#resolveTable(String...) * @see CatalogManager#setCurrentCatalog(String) @@ -294,6 +298,15 @@ public class CatalogManager { } /** + * Gets the default catalog name. + * + * @return the default catalog + */ + public String getDefaultCatalogName() { + return defaultCatalogName; + } + + /** * Tries to resolve a table path to a {@link ResolvedTable}. The algorithm looks for requested table * in the following paths in that order: * <ol> diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java index 0d18ffe..af7f21f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java @@ -21,7 +21,10 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; import org.apache.flink.table.delegation.PlannerTypeInferenceUtil; +import org.apache.flink.table.factories.FunctionDefinitionFactory; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.AggregateFunctionDefinition; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; @@ -36,21 +39,26 @@ import org.apache.flink.table.functions.UserDefinedAggregateFunction; import org.apache.flink.table.functions.UserFunctionsTypeHelper; import org.apache.flink.util.Preconditions; +import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Simple function catalog to store {@link FunctionDefinition}s in memory. + * Simple function catalog to store {@link FunctionDefinition}s in catalogs. */ @Internal public class FunctionCatalog implements FunctionLookup { - private final String defaultCatalogName; - - private final String defaultDatabaseName; + private final CatalogManager catalogManager; + // For simplicity, currently hold registered Flink functions in memory here + // TODO: should move to catalog private final Map<String, FunctionDefinition> userFunctions = new LinkedHashMap<>(); /** @@ -58,11 +66,8 @@ public class FunctionCatalog implements FunctionLookup { */ private PlannerTypeInferenceUtil plannerTypeInferenceUtil; - public FunctionCatalog( - String defaultCatalogName, - String defaultDatabaseName) { - this.defaultCatalogName = defaultCatalogName; - this.defaultDatabaseName = defaultDatabaseName; + public FunctionCatalog(CatalogManager catalogManager) { + this.catalogManager = checkNotNull(catalogManager); } public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil plannerTypeInferenceUtil) { @@ -129,35 +134,81 @@ public class FunctionCatalog implements FunctionLookup { } public String[] getUserDefinedFunctions() { - return userFunctions.values().stream() - .map(FunctionDefinition::toString) - .toArray(String[]::new); + List<String> result = new ArrayList<>(); + + // Get functions in catalog + Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get(); + try { + result.addAll(catalog.listFunctions(catalogManager.getCurrentDatabase())); + } catch (DatabaseNotExistException e) { + // Ignore since there will always be a current database of the current catalog + } + + // Get functions registered in memory + result.addAll( + userFunctions.values().stream() + .map(FunctionDefinition::toString) + .collect(Collectors.toList())); + + return result.stream() + .collect(Collectors.toList()) + .toArray(new String[0]); } @Override public Optional<FunctionLookup.Result> lookupFunction(String name) { - final FunctionDefinition userCandidate = userFunctions.get(normalizeName(name)); - final Optional<FunctionDefinition> foundDefinition; - if (userCandidate != null) { - foundDefinition = Optional.of(userCandidate); - } else { + String functionName = normalizeName(name); - // TODO once we connect this class with the Catalog APIs we need to make sure that - // built-in functions are present in "root" built-in catalog. This allows to - // overwrite built-in functions but also fallback to the "root" catalog. It should be - // possible to disable the "root" catalog if that is desired. + FunctionDefinition userCandidate = null; - foundDefinition = BuiltInFunctionDefinitions.getDefinitions() - .stream() - .filter(f -> normalizeName(name).equals(normalizeName(f.getName()))) - .findFirst() - .map(Function.identity()); - } + Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get(); - return foundDefinition.map(definition -> new FunctionLookup.Result( - ObjectIdentifier.of(defaultCatalogName, defaultDatabaseName, name), - definition) - ); + if (catalog.getTableFactory().isPresent() && + catalog.getTableFactory().get() instanceof FunctionDefinitionFactory) { + try { + CatalogFunction catalogFunction = catalog.getFunction( + new ObjectPath(catalogManager.getCurrentDatabase(), functionName)); + + FunctionDefinitionFactory factory = (FunctionDefinitionFactory) catalog.getTableFactory().get(); + + userCandidate = factory.createFunctionDefinition(functionName, catalogFunction); + } catch (FunctionNotExistException e) { + // Ignore + } + + return Optional.of( + new FunctionLookup.Result( + ObjectIdentifier.of(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase(), name), + userCandidate) + ); + } else { + // Else, check in-memory functions + userCandidate = userFunctions.get(functionName); + + final Optional<FunctionDefinition> foundDefinition; + if (userCandidate != null) { + foundDefinition = Optional.of(userCandidate); + } else { + + // TODO once we connect this class with the Catalog APIs we need to make sure that + // built-in functions are present in "root" built-in catalog. This allows to + // overwrite built-in functions but also fallback to the "root" catalog. It should be + // possible to disable the "root" catalog if that is desired. + + foundDefinition = BuiltInFunctionDefinitions.getDefinitions() + .stream() + .filter(f -> functionName.equals(normalizeName(f.getName()))) + .findFirst() + .map(Function.identity()); + } + + String defaultCatalogName = catalogManager.getDefaultCatalogName(); + + return foundDefinition.map(definition -> new FunctionLookup.Result( + ObjectIdentifier.of(defaultCatalogName, catalogManager.getCatalog(defaultCatalogName).get().getDefaultDatabase(), name), + definition) + ); + } } @Override @@ -169,6 +220,7 @@ public class FunctionCatalog implements FunctionLookup { } private void registerFunction(String name, FunctionDefinition functionDefinition) { + // TODO: should register to catalog userFunctions.put(normalizeName(name), functionDefinition); } diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala index c2b9831..1304c8c 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala @@ -275,13 +275,12 @@ object StreamTableEnvironmentImpl { tableConfig: TableConfig) : StreamTableEnvironmentImpl = { - val functionCatalog = new FunctionCatalog( - settings.getBuiltInCatalogName, - settings.getBuiltInDatabaseName) val catalogManager = new CatalogManager( settings.getBuiltInCatalogName, new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName)) + val functionCatalog = new FunctionCatalog(catalogManager) + val executorProperties = settings.toExecutorProperties val executor = lookupExecutor(executorProperties, executionEnvironment) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java index dbb5c1b..8019ab0 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java @@ -47,8 +47,8 @@ import java.util.Optional; public interface Catalog { /** - * Get an optional {@link TableFactory} instance that's responsible for generating source/sink for tables - * stored in this catalog. + * Get an optional {@link TableFactory} instance that's responsible for generating table-related + * instances stored in this catalog, instances such as source/sink and function definitions. * * @return an optional TableFactory instance */ diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java new file mode 100644 index 0000000..2e8c538 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.functions.FunctionDefinition; + +/** + * A factory to create {@link FunctionDefinition}. + * See also {@link TableFactory} for more information. + */ +public interface FunctionDefinitionFactory extends TableFactory { + + /** + * Creates a {@link FunctionDefinition} from given {@link CatalogFunction}. + * + * @param name name of the {@link CatalogFunction} + * @param catalogFunction the catalog function + * @return a {@link FunctionDefinition} + */ + FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction); +} diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/AggCallSelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/AggCallSelectivityEstimatorTest.scala index ea14fa9..b52d9e3 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/AggCallSelectivityEstimatorTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/AggCallSelectivityEstimatorTest.scala @@ -20,12 +20,11 @@ package org.apache.flink.table.plan.metadata import org.apache.flink.table.api.TableConfig import org.apache.flink.table.calcite.{FlinkContextImpl, FlinkTypeFactory, FlinkTypeSystem} -import org.apache.flink.table.catalog.FunctionCatalog +import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} import org.apache.flink.table.plan.schema._ import org.apache.flink.table.plan.stats.{ColumnStats, FlinkStatistic, TableStats} import org.apache.flink.table.{JDouble, JLong} import org.apache.flink.util.Preconditions - import com.google.common.collect.ImmutableList import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster} import org.apache.calcite.rel.`type`.RelDataType @@ -45,7 +44,6 @@ import org.junit.{Before, BeforeClass, Test} import org.powermock.api.mockito.PowerMockito._ import org.powermock.core.classloader.annotations.PrepareForTest import org.powermock.modules.junit4.PowerMockRunner - import java.math.BigDecimal import scala.collection.JavaConversions._ @@ -81,7 +79,8 @@ class AggCallSelectivityEstimatorTest { val tableScan = mock(classOf[TableScan]) val cluster = mock(classOf[RelOptCluster]) val planner = mock(classOf[AbstractRelOptPlanner]) - val functionCatalog = new FunctionCatalog("default_catalog", "default_database") + val catalogManager = mock(classOf[CatalogManager]) + val functionCatalog = new FunctionCatalog(catalogManager) val context = new FlinkContextImpl(new TableConfig, functionCatalog) when(tableScan, "getCluster").thenReturn(cluster) when(cluster, "getRexBuilder").thenReturn(rexBuilder) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala index 8d7c348..c1627a6 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.plan.metadata import org.apache.flink.table.api.{TableConfig, TableException} import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} -import org.apache.flink.table.catalog.FunctionCatalog +import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog} import org.apache.flink.table.expressions.utils.ApiExpressionUtils.intervalOfMillis import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.aggfunctions.SumAggFunction.DoubleSumAggFunction @@ -44,7 +44,6 @@ import org.apache.flink.table.runtime.rank.{ConstantRankRange, RankType, Variabl import org.apache.flink.table.types.AtomicDataType import org.apache.flink.table.types.logical.{BigIntType, DoubleType, IntType, LogicalType, TimestampKind, TimestampType, VarCharType} import org.apache.flink.table.util.CountAggFunction - import com.google.common.collect.{ImmutableList, Lists} import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.plan._ @@ -64,7 +63,6 @@ import org.apache.calcite.sql.fun.{SqlCountAggFunction, SqlStdOperatorTable} import org.apache.calcite.sql.parser.SqlParserPos import org.apache.calcite.util.{DateString, ImmutableBitSet, ImmutableIntList, TimeString, TimestampString} import org.junit.{Before, BeforeClass} - import java.math.BigDecimal import java.util @@ -74,12 +72,17 @@ class FlinkRelMdHandlerTestBase { val tableConfig = new TableConfig() val rootSchema: SchemaPlus = MetadataTestUtil.initRootSchema() + + val defaultCatalog = "default_catalog" + val catalogManager = new CatalogManager( + defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database")) + // TODO batch RelNode and stream RelNode should have different PlannerContext // and RelOptCluster due to they have different trait definitions. val plannerContext: PlannerContext = new PlannerContext( tableConfig, - new FunctionCatalog("default_catalog", "default_database"), + new FunctionCatalog(catalogManager), CalciteSchema.from(rootSchema), util.Arrays.asList( ConventionTraitDef.INSTANCE, diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/SelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/SelectivityEstimatorTest.scala index 8494ae1..2259d94 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/SelectivityEstimatorTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/SelectivityEstimatorTest.scala @@ -20,12 +20,11 @@ package org.apache.flink.table.plan.metadata import org.apache.flink.table.api.TableConfig import org.apache.flink.table.calcite.{FlinkContext, FlinkContextImpl, FlinkTypeFactory, FlinkTypeSystem} -import org.apache.flink.table.catalog.FunctionCatalog +import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} import org.apache.flink.table.plan.schema._ import org.apache.flink.table.plan.stats.{ColumnStats, FlinkStatistic, TableStats} import org.apache.flink.table.{JDouble, JLong} import org.apache.flink.util.Preconditions - import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan @@ -42,7 +41,6 @@ import org.junit.{Before, BeforeClass, Test} import org.powermock.api.mockito.PowerMockito._ import org.powermock.core.classloader.annotations.PrepareForTest import org.powermock.modules.junit4.PowerMockRunner - import java.math.BigDecimal import scala.collection.JavaConverters._ @@ -83,7 +81,8 @@ class SelectivityEstimatorTest { val tableScan = mock(classOf[TableScan]) val cluster = mock(classOf[RelOptCluster]) val planner = mock(classOf[AbstractRelOptPlanner]) - val functionCatalog = new FunctionCatalog("default_catalog", "default_database") + val catalogManager = mock(classOf[CatalogManager]) + val functionCatalog = new FunctionCatalog(catalogManager) val context: FlinkContext = new FlinkContextImpl(tableConfig, functionCatalog) when(tableScan, "getCluster").thenReturn(cluster) when(cluster, "getRexBuilder").thenReturn(rexBuilder) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala index c963b1f..f8af378 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala @@ -18,9 +18,6 @@ package org.apache.flink.table.plan.util -import org.apache.flink.table.api.{DataTypes, Types} -import org.apache.flink.table.catalog.FunctionCatalog -import org.apache.flink.table.expressions._ import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{unresolvedCall, unresolvedRef, valueLiteral} import org.apache.flink.table.expressions.utils.Func1 import org.apache.flink.table.functions.AggregateFunctionDefinition @@ -29,7 +26,6 @@ import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable import org.apache.flink.table.functions.utils.ScalarSqlFunction import org.apache.flink.table.plan.util.InputTypeBuilder.inputOf import org.apache.flink.table.util.{DateTimeTestUtil, IntSumAggFunction} - import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex.{RexBuilder, RexNode} import org.apache.calcite.sql.SqlPostfixOperator @@ -40,21 +36,26 @@ import org.apache.calcite.util.{DateString, TimeString, TimestampString} import org.hamcrest.CoreMatchers.is import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat, assertTrue} import org.junit.Test - import java.math.BigDecimal import java.sql.Timestamp -import java.util.{List => JList} -import java.sql.{Date, Time, Timestamp} import java.util.{TimeZone, List => JList} +import org.apache.flink.api.common.typeinfo.Types +import org.apache.flink.table.api.DataTypes +import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog} +import org.apache.flink.table.expressions.{EqualTo, Expression, ExpressionBridge, ExpressionParser, GreaterThan, Literal, PlannerExpression, PlannerExpressionConverter, Sum, UnresolvedFieldReference} + import scala.collection.JavaConverters._ /** * Test for [[RexNodeExtractor]]. */ class RexNodeExtractorTest extends RexNodeTestBase { + val defaultCatalog = "default_catalog" + val catalogManager = new CatalogManager( + defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database")) - private val functionCatalog = new FunctionCatalog("default_catalog", "default_database") + private val functionCatalog = new FunctionCatalog(catalogManager) private val expressionBridge: ExpressionBridge[PlannerExpression] = new ExpressionBridge[PlannerExpression]( diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala index 26a5268..38b5d12 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala @@ -985,8 +985,6 @@ object TestingTableEnvironment { def create( settings: EnvironmentSettings, catalogManager: Option[CatalogManager] = None): TestingTableEnvironment = { - val functionCatalog = new FunctionCatalog( - settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName) val catalogMgr = catalogManager match { case Some(c) => c case _ => @@ -994,6 +992,7 @@ object TestingTableEnvironment { new GenericInMemoryCatalog( settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName)) } + val functionCatalog = new FunctionCatalog(catalogMgr) val plannerProperties = settings.toPlannerProperties val executorProperties = settings.toExecutorProperties val executor = ComponentFactoryService.find(classOf[ExecutorFactory], diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 13d6c53..3452af6 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -64,9 +64,8 @@ abstract class TableEnvImpl( protected val builtinDatabaseName: String = catalogManager.getCurrentDatabase // Table API/SQL function catalog - private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog( - builtinCatalogName, - builtinDatabaseName) + private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog(catalogManager) + // temporary utility until we don't use planner expressions anymore functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala index 0dc9805..9c16135 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala @@ -23,7 +23,7 @@ import java.util import org.apache.calcite.plan.RelOptRule.{none, operand} import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} import org.apache.calcite.rex.RexProgram -import org.apache.flink.table.catalog.FunctionCatalog +import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog} import org.apache.flink.table.expressions.{Expression, PlannerExpression} import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableSourceScan} import org.apache.flink.table.plan.util.RexProgramExtractor @@ -37,6 +37,10 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule( operand(classOf[FlinkLogicalTableSourceScan], none)), "PushFilterIntoTableSourceScanRule") { + private val defaultCatalog = "default_catalog" + private val catalogManager = new CatalogManager( + defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database")) + override def matches(call: RelOptRuleCall): Boolean = { val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc] val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan] @@ -68,7 +72,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule( RexProgramExtractor.extractConjunctiveConditions( program, call.builder().getRexBuilder, - new FunctionCatalog("default_catalog", "default_database")) + new FunctionCatalog(catalogManager)) if (predicates.isEmpty) { // no condition can be translated to expression return diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala index 20e8ba6..11e43cf 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala @@ -207,7 +207,7 @@ class StreamTableEnvironmentTest extends TableTestBase { "default_catalog", new GenericInMemoryCatalog("default_catalog", "default_database")) val executor: StreamExecutor = new StreamExecutor(jStreamExecEnv) - val functionCatalog = new FunctionCatalog(manager.getCurrentCatalog, manager.getCurrentDatabase) + val functionCatalog = new FunctionCatalog(manager) val streamPlanner = new StreamPlanner(executor, config, functionCatalog, manager) val jTEnv = new JStreamTableEnvironmentImpl( manager, diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala index b56919d..c917267 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala @@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl import org.apache.flink.table.api.{TableConfig, Types} -import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} +import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog} import org.apache.flink.table.delegation.{Executor, Planner} import org.apache.flink.table.functions.{AggregateFunction, AggregateFunctionDefinition} import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode} @@ -66,9 +66,13 @@ class AggregateTest extends TableTestBase { @Test def testUserDefinedAggregateFunctionWithScalaAccumulator(): Unit = { - val functionCatalog = new FunctionCatalog("cat", "db") + val defaultCatalog = "default_catalog" + val catalogManager = new CatalogManager( + defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database")) + + val functionCatalog = new FunctionCatalog(catalogManager) val tablEnv = new StreamTableEnvironmentImpl( - Mockito.mock(classOf[CatalogManager]), + catalogManager, functionCatalog, new TableConfig, Mockito.mock(classOf[StreamExecutionEnvironment]), diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala index fde9743..b752b76 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala @@ -28,7 +28,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, INTEGER, VARCHAR} import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.util.{DateString, TimeString, TimestampString} -import org.apache.flink.table.catalog.FunctionCatalog +import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog} import org.apache.flink.table.expressions._ import org.apache.flink.table.plan.util.{RexNodeToExpressionConverter, RexProgramExtractor} import org.apache.flink.table.utils.InputTypeBuilder.inputOf @@ -42,8 +42,7 @@ import scala.collection.mutable class RexProgramExtractorTest extends RexProgramTestBase { private val functionCatalog: FunctionCatalog = new FunctionCatalog( - "default_catalog", - "default_database") + new CatalogManager("default_catalog", new GenericInMemoryCatalog("default_catalog"))) private val expressionBridge: ExpressionBridge[PlannerExpression] = new ExpressionBridge[PlannerExpression]( functionCatalog, diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala index e5810e0..6e0c3a3 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala @@ -329,8 +329,7 @@ case class StreamTableTestUtil( private val tableConfig = new TableConfig private val manager: CatalogManager = catalogManager.getOrElse(createCatalogManager()) private val executor: StreamExecutor = new StreamExecutor(javaEnv) - private val functionCatalog = - new FunctionCatalog(manager.getCurrentCatalog, manager.getCurrentDatabase) + private val functionCatalog = new FunctionCatalog(manager) private val streamPlanner = new StreamPlanner(executor, tableConfig, functionCatalog, manager) val javaTableEnv = new JavaStreamTableEnvironmentImpl(