This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d1b06ac [FLINK-14401][table][hive] create DefaultFunctionDefinitionFactory to instantiate regular java class-based udf d1b06ac is described below commit d1b06ac4e8ab033c699b656cb0f1d924b6bafc56 Author: bowen.li <bowenl...@gmail.com> AuthorDate: Tue Oct 15 16:21:29 2019 -0700 [FLINK-14401][table][hive] create DefaultFunctionDefinitionFactory to instantiate regular java class-based udf create FunctionDefinitionUtil to instantiate regular java class-based udf and add HiveFunctionDefinitionFactory to instantiate both flink and hive udf This closes #9908. --- .../flink/connectors/hive/HiveTableFactory.java | 103 +--------------- .../flink/table/catalog/hive/HiveCatalog.java | 7 ++ .../factories/HiveFunctionDefinitionFactory.java} | 100 ++------------- .../table/tests/test_catalog_completeness.py | 2 +- .../flink/table/catalog/FunctionCatalog.java | 24 ++-- .../table/functions/FunctionDefinitionUtil.java | 77 ++++++++++++ .../functions/FunctionDefinitionUtilTest.java | 135 +++++++++++++++++++++ .../org/apache/flink/table/catalog/Catalog.java | 12 +- .../table/factories/FunctionDefinitionFactory.java | 3 +- 9 files changed, 253 insertions(+), 210 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java index 235919c..89f76da 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java @@ -18,29 +18,13 @@ package org.apache.flink.connectors.hive; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.table.api.TableException; -import org.apache.flink.table.catalog.CatalogFunction; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.config.CatalogConfig; -import org.apache.flink.table.catalog.hive.client.HiveShim; -import org.apache.flink.table.catalog.hive.client.HiveShimLoader; -import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; -import org.apache.flink.table.factories.FunctionDefinitionFactory; import org.apache.flink.table.factories.TableFactoryUtil; import org.apache.flink.table.factories.TableSinkFactory; import org.apache.flink.table.factories.TableSourceFactory; -import org.apache.flink.table.functions.AggregateFunctionDefinition; -import org.apache.flink.table.functions.FunctionDefinition; -import org.apache.flink.table.functions.ScalarFunctionDefinition; -import org.apache.flink.table.functions.TableFunctionDefinition; -import org.apache.flink.table.functions.hive.HiveFunctionWrapper; -import org.apache.flink.table.functions.hive.HiveGenericUDAF; -import org.apache.flink.table.functions.hive.HiveGenericUDF; -import org.apache.flink.table.functions.hive.HiveGenericUDTF; -import org.apache.flink.table.functions.hive.HiveSimpleUDF; import org.apache.flink.table.sinks.OutputFormatTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.InputFormatTableSource; @@ -49,12 +33,6 @@ import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.UDAF; -import org.apache.hadoop.hive.ql.exec.UDF; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,20 +46,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * A table factory implementation for Hive catalog. */ public class HiveTableFactory - implements TableSourceFactory<Row>, TableSinkFactory<Row>, FunctionDefinitionFactory { + implements TableSourceFactory<Row>, TableSinkFactory<Row> { private static final Logger LOG = LoggerFactory.getLogger(HiveTableFactory.class); private final HiveConf hiveConf; - private final String hiveVersion; - private final HiveShim hiveShim; public HiveTableFactory(HiveConf hiveConf) { this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); - - // this has to come from hiveConf, otherwise we may lose what user specifies in the yaml file - this.hiveVersion = checkNotNull(hiveConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION), - "Hive version is not defined"); - this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion); } @Override @@ -145,76 +116,4 @@ public class HiveTableFactory private OutputFormatTableSink<Row> createOutputFormatTableSink(ObjectPath tablePath, CatalogTable table) { return new HiveTableSink(new JobConf(hiveConf), tablePath, table); } - - @Override - public FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction) { - String functionClassName = catalogFunction.getClassName(); - - if (Boolean.valueOf(catalogFunction.getProperties().get(CatalogConfig.IS_GENERIC))) { - throw new TableException( - String.format("HiveFunctionDefinitionFactory does not support generic functions %s yet", name)); - } - - Class clazz; - try { - clazz = Thread.currentThread().getContextClassLoader().loadClass(functionClassName); - - LOG.info("Successfully loaded Hive udf '{}' with class '{}'", name, functionClassName); - } catch (ClassNotFoundException e) { - throw new TableException( - String.format("Failed to initiate an instance of class %s.", functionClassName), e); - } - - if (UDF.class.isAssignableFrom(clazz)) { - LOG.info("Transforming Hive function '{}' into a HiveSimpleUDF", name); - - return new ScalarFunctionDefinition( - name, - new HiveSimpleUDF(new HiveFunctionWrapper<>(functionClassName), hiveShim) - ); - } else if (GenericUDF.class.isAssignableFrom(clazz)) { - LOG.info("Transforming Hive function '{}' into a HiveGenericUDF", name); - - return new ScalarFunctionDefinition( - name, - new HiveGenericUDF(new HiveFunctionWrapper<>(functionClassName), hiveShim) - ); - } else if (GenericUDTF.class.isAssignableFrom(clazz)) { - LOG.info("Transforming Hive function '{}' into a HiveGenericUDTF", name); - - HiveGenericUDTF udtf = new HiveGenericUDTF(new HiveFunctionWrapper<>(functionClassName), hiveShim); - - return new TableFunctionDefinition( - name, - udtf, - GenericTypeInfo.of(Row.class) - ); - } else if (GenericUDAFResolver2.class.isAssignableFrom(clazz) || UDAF.class.isAssignableFrom(clazz)) { - HiveGenericUDAF udaf; - - if (GenericUDAFResolver2.class.isAssignableFrom(clazz)) { - LOG.info( - "Transforming Hive function '{}' into a HiveGenericUDAF with no UDAF bridging and Hive version %s", - name, hiveVersion); - - udaf = new HiveGenericUDAF(new HiveFunctionWrapper<>(functionClassName), false, hiveVersion); - } else { - LOG.info( - "Transforming Hive function '{}' into a HiveGenericUDAF with UDAF bridging and Hive version %s", - name, hiveVersion); - - udaf = new HiveGenericUDAF(new HiveFunctionWrapper<>(functionClassName), true, hiveVersion); - } - - return new AggregateFunctionDefinition( - name, - udaf, - GenericTypeInfo.of(Object.class), - GenericTypeInfo.of(GenericUDAFEvaluator.AggregationBuffer.class) - ); - } else { - throw new IllegalArgumentException( - String.format("HiveFunctionDefinitionFactory cannot initiate FunctionDefinition for class %s", functionClassName)); - } - } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 7914c89..1e66551 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -56,11 +56,13 @@ import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper; import org.apache.flink.table.catalog.hive.client.HiveShim; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; +import org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory; import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; import org.apache.flink.table.catalog.hive.util.HiveStatsUtil; import org.apache.flink.table.catalog.hive.util.HiveTableUtil; import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.factories.FunctionDefinitionFactory; import org.apache.flink.table.factories.TableFactory; import org.apache.flink.util.StringUtils; @@ -202,6 +204,11 @@ public class HiveCatalog extends AbstractCatalog { return Optional.of(new HiveTableFactory(hiveConf)); } + @Override + public Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() { + return Optional.of(new HiveFunctionDefinitionFactory(hiveConf)); + } + // ------ databases ------ @Override diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java similarity index 62% copy from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java index 235919c..3e32c7f 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java @@ -16,24 +16,20 @@ * limitations under the License. */ -package org.apache.flink.connectors.hive; +package org.apache.flink.table.catalog.hive.factories; import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.connectors.hive.HiveTableFactory; import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.CatalogFunction; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; -import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.config.CatalogConfig; import org.apache.flink.table.catalog.hive.client.HiveShim; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; import org.apache.flink.table.factories.FunctionDefinitionFactory; -import org.apache.flink.table.factories.TableFactoryUtil; -import org.apache.flink.table.factories.TableSinkFactory; -import org.apache.flink.table.factories.TableSourceFactory; import org.apache.flink.table.functions.AggregateFunctionDefinition; import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionDefinitionUtil; import org.apache.flink.table.functions.ScalarFunctionDefinition; import org.apache.flink.table.functions.TableFunctionDefinition; import org.apache.flink.table.functions.hive.HiveFunctionWrapper; @@ -41,12 +37,7 @@ import org.apache.flink.table.functions.hive.HiveGenericUDAF; import org.apache.flink.table.functions.hive.HiveGenericUDF; import org.apache.flink.table.functions.hive.HiveGenericUDTF; import org.apache.flink.table.functions.hive.HiveSimpleUDF; -import org.apache.flink.table.sinks.OutputFormatTableSink; -import org.apache.flink.table.sinks.TableSink; -import org.apache.flink.table.sources.InputFormatTableSource; -import org.apache.flink.table.sources.TableSource; import org.apache.flink.types.Row; -import org.apache.flink.util.Preconditions; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.UDAF; @@ -55,106 +46,35 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; -import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.Map; - import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A table factory implementation for Hive catalog. + * A factory to instantiate Hive UDFs as Flink UDFs. */ -public class HiveTableFactory - implements TableSourceFactory<Row>, TableSinkFactory<Row>, FunctionDefinitionFactory { +public class HiveFunctionDefinitionFactory implements FunctionDefinitionFactory { private static final Logger LOG = LoggerFactory.getLogger(HiveTableFactory.class); - private final HiveConf hiveConf; private final String hiveVersion; private final HiveShim hiveShim; - public HiveTableFactory(HiveConf hiveConf) { - this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); - + public HiveFunctionDefinitionFactory(HiveConf hiveConf) { // this has to come from hiveConf, otherwise we may lose what user specifies in the yaml file this.hiveVersion = checkNotNull(hiveConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION), - "Hive version is not defined"); + "Hive version is not defined"); this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion); } @Override - public Map<String, String> requiredContext() { - throw new UnsupportedOperationException(); - } - - @Override - public List<String> supportedProperties() { - throw new UnsupportedOperationException(); - } - - @Override - public TableSink<Row> createTableSink(Map<String, String> properties) { - throw new UnsupportedOperationException(); - } - - @Override - public TableSource<Row> createTableSource(Map<String, String> properties) { - throw new UnsupportedOperationException(); - } - - @Override - public TableSource<Row> createTableSource(ObjectPath tablePath, CatalogTable table) { - Preconditions.checkNotNull(table); - Preconditions.checkArgument(table instanceof CatalogTableImpl); - - boolean isGeneric = Boolean.valueOf(table.getProperties().get(CatalogConfig.IS_GENERIC)); - - if (!isGeneric) { - return createInputFormatTableSource(tablePath, table); - } else { - return TableFactoryUtil.findAndCreateTableSource(table); - } - } - - /** - * Creates and configures a {@link org.apache.flink.table.sources.InputFormatTableSource} using the given {@link CatalogTable}. - */ - private InputFormatTableSource<Row> createInputFormatTableSource(ObjectPath tablePath, CatalogTable table) { - return new HiveTableSource(new JobConf(hiveConf), tablePath, table); - } - - @Override - public TableSink<Row> createTableSink(ObjectPath tablePath, CatalogTable table) { - Preconditions.checkNotNull(table); - Preconditions.checkArgument(table instanceof CatalogTableImpl); - - boolean isGeneric = Boolean.valueOf(table.getProperties().get(CatalogConfig.IS_GENERIC)); - - if (!isGeneric) { - return createOutputFormatTableSink(tablePath, table); - } else { - return TableFactoryUtil.findAndCreateTableSink(table); - } - } - - /** - * Creates and configures a {@link org.apache.flink.table.sinks.OutputFormatTableSink} using the given {@link CatalogTable}. - */ - private OutputFormatTableSink<Row> createOutputFormatTableSink(ObjectPath tablePath, CatalogTable table) { - return new HiveTableSink(new JobConf(hiveConf), tablePath, table); - } - - @Override public FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction) { - String functionClassName = catalogFunction.getClassName(); - if (Boolean.valueOf(catalogFunction.getProperties().get(CatalogConfig.IS_GENERIC))) { - throw new TableException( - String.format("HiveFunctionDefinitionFactory does not support generic functions %s yet", name)); + FunctionDefinitionUtil.createFunctionDefinition(name, catalogFunction); } + String functionClassName = catalogFunction.getClassName(); + Class clazz; try { clazz = Thread.currentThread().getContextClassLoader().loadClass(functionClassName); diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py b/flink-python/pyflink/table/tests/test_catalog_completeness.py index 9474c30..003c144 100644 --- a/flink-python/pyflink/table/tests/test_catalog_completeness.py +++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py @@ -40,7 +40,7 @@ class CatalogAPICompletenessTests(PythonAPICompletenessTestCase, unittest.TestCa @classmethod def excluded_methods(cls): # open/close are not needed in Python API as they are used internally - return {'open', 'close', 'getTableFactory'} + return {'open', 'close', 'getTableFactory', 'getFunctionDefinitionFactory'} class CatalogDatabaseAPICompletenessTests(PythonAPICompletenessTestCase, unittest.TestCase): diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java index 43280b4..6808216 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java @@ -25,11 +25,11 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; import org.apache.flink.table.delegation.PlannerTypeInferenceUtil; -import org.apache.flink.table.factories.FunctionDefinitionFactory; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.AggregateFunctionDefinition; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionDefinitionUtil; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.ScalarFunctionDefinition; import org.apache.flink.table.functions.TableAggregateFunction; @@ -242,21 +242,17 @@ public class FunctionCatalog implements FunctionLookup { CatalogFunction catalogFunction = catalog.getFunction( new ObjectPath(catalogManager.getCurrentDatabase(), functionName)); - if (catalog.getTableFactory().isPresent() && - catalog.getTableFactory().get() instanceof FunctionDefinitionFactory) { - - FunctionDefinitionFactory factory = (FunctionDefinitionFactory) catalog.getTableFactory().get(); - - userCandidate = factory.createFunctionDefinition(functionName, catalogFunction); - - return Optional.of( - new FunctionLookup.Result( - ObjectIdentifier.of(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase(), name), - userCandidate) - ); + if (catalog.getFunctionDefinitionFactory().isPresent()) { + userCandidate = catalog.getFunctionDefinitionFactory().get().createFunctionDefinition(functionName, catalogFunction); } else { - // TODO: should go through function definition discover service + userCandidate = FunctionDefinitionUtil.createFunctionDefinition(functionName, catalogFunction); } + + return Optional.of( + new FunctionLookup.Result( + ObjectIdentifier.of(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase(), name), + userCandidate) + ); } catch (FunctionNotExistException e) { // Ignore } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java new file mode 100644 index 0000000..98cedaf --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.table.catalog.CatalogFunction; + +/** + * A util to instantiate {@link FunctionDefinition} in the default way. + */ +public class FunctionDefinitionUtil { + + public static FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction) { + // Currently only handles Java class-based functions + Object func; + try { + func = Thread.currentThread().getContextClassLoader().loadClass(catalogFunction.getClassName()).newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + throw new IllegalStateException( + String.format("Failed instantiating '%s'", catalogFunction.getClassName()) + ); + } + + UserDefinedFunction udf = (UserDefinedFunction) func; + + if (udf instanceof ScalarFunction) { + return new ScalarFunctionDefinition( + name, + (ScalarFunction) udf + ); + } else if (udf instanceof TableFunction) { + TableFunction t = (TableFunction) udf; + return new TableFunctionDefinition( + name, + t, + t.getResultType() + ); + } else if (udf instanceof AggregateFunction) { + AggregateFunction a = (AggregateFunction) udf; + + return new AggregateFunctionDefinition( + name, + a, + a.getAccumulatorType(), + a.getResultType() + ); + } else if (udf instanceof TableAggregateFunction) { + TableAggregateFunction a = (TableAggregateFunction) udf; + + return new TableAggregateFunctionDefinition( + name, + a, + a.getAccumulatorType(), + a.getResultType() + ); + } else { + throw new UnsupportedOperationException( + String.format("Function %s should be of ScalarFunction, TableFunction, AggregateFunction, or TableAggregateFunction", catalogFunction.getClassName()) + ); + } + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java new file mode 100644 index 0000000..84cb81c --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.catalog.CatalogFunctionImpl; + +import org.junit.Test; + +import java.util.Collections; + +import static org.junit.Assert.assertTrue; + +/** + * Test for {@link FunctionDefinitionUtil}. + */ +public class FunctionDefinitionUtilTest { + @Test + public void testScalarFunction() { + FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition( + "test", + new CatalogFunctionImpl(TestScalarFunction.class.getName(), Collections.emptyMap()) + ); + + assertTrue(((ScalarFunctionDefinition) fd).getScalarFunction() instanceof TestScalarFunction); + } + + @Test + public void testTableFunction() { + FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition( + "test", + new CatalogFunctionImpl(TestTableFunction.class.getName(), Collections.emptyMap()) + ); + + assertTrue(((TableFunctionDefinition) fd).getTableFunction() instanceof TestTableFunction); + } + + @Test + public void testAggregateFunction() { + FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition( + "test", + new CatalogFunctionImpl(TestAggFunction.class.getName(), Collections.emptyMap()) + ); + + assertTrue(((AggregateFunctionDefinition) fd).getAggregateFunction() instanceof TestAggFunction); + } + + @Test + public void testTableAggregateFunction() { + FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition( + "test", + new CatalogFunctionImpl(TestTableAggFunction.class.getName(), Collections.emptyMap()) + ); + + assertTrue(((TableAggregateFunctionDefinition) fd).getTableAggregateFunction() instanceof TestTableAggFunction); + } + + /** + * Test function. + */ + public static class TestScalarFunction extends ScalarFunction { + + } + + /** + * Test function. + */ + public static class TestTableFunction extends TableFunction { + @Override + public TypeInformation getResultType() { + return TypeInformation.of(Object.class); + } + } + + /** + * Test function. + */ + public static class TestAggFunction extends AggregateFunction { + @Override + public Object createAccumulator() { + return null; + } + + @Override + public TypeInformation getResultType() { + return TypeInformation.of(Object.class); + } + + @Override + public TypeInformation getAccumulatorType() { + return TypeInformation.of(Object.class); + } + + @Override + public Object getValue(Object accumulator) { + return null; + } + } + + /** + * Test function. + */ + public static class TestTableAggFunction extends TableAggregateFunction { + @Override + public Object createAccumulator() { + return null; + } + + @Override + public TypeInformation getResultType() { + return TypeInformation.of(Object.class); + } + + @Override + public TypeInformation getAccumulatorType() { + return TypeInformation.of(Object.class); + } + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java index 8019ab0..8f210b9 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java @@ -34,6 +34,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.flink.table.catalog.exceptions.TablePartitionedException; import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.factories.FunctionDefinitionFactory; import org.apache.flink.table.factories.TableFactory; import java.util.List; @@ -48,7 +49,7 @@ public interface Catalog { /** * Get an optional {@link TableFactory} instance that's responsible for generating table-related - * instances stored in this catalog, instances such as source/sink and function definitions. + * instances stored in this catalog, instances such as source/sink. * * @return an optional TableFactory instance */ @@ -57,6 +58,15 @@ public interface Catalog { } /** + * Get an optional {@link FunctionDefinitionFactory} instance that's responsible for instantiating function definitions. + * + * @return an optional FunctionDefinitionFactory instance + */ + default Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() { + return Optional.empty(); + } + + /** * Open the catalog. Used for any required preparation in initialization phase. * * @throws CatalogException in case of any runtime exception diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java index 2e8c538..1e4d76b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java @@ -23,9 +23,8 @@ import org.apache.flink.table.functions.FunctionDefinition; /** * A factory to create {@link FunctionDefinition}. - * See also {@link TableFactory} for more information. */ -public interface FunctionDefinitionFactory extends TableFactory { +public interface FunctionDefinitionFactory { /** * Creates a {@link FunctionDefinition} from given {@link CatalogFunction}.